diff --git a/app/transcript.py b/app/transcript.py new file mode 100644 index 0000000..371e515 --- /dev/null +++ b/app/transcript.py @@ -0,0 +1,389 @@ +import json +import logging +import os +import shutil +import tempfile +from datetime import datetime +from urllib.parse import parse_qs, urlparse + +import pytube +import requests +import static_ffmpeg +import whisper +import yt_dlp +from clint.textui import progress +from moviepy.editor import VideoFileClip + +from app import __app_name__, __version__, application + + +class Transcript: + def __init__(self, source, test_mode=False): + self.source = source + self.test_mode = test_mode + self.logger = logging.getLogger(__app_name__) + + def create_transcript(self): + result = "" + for x in self.result: + result = result + x[2] + " " + + return result + + def process_source(self, tmp_dir=None): + tmp_dir = tmp_dir if tmp_dir is not None else tempfile.mkdtemp() + self.audio_file = self.source.process(tmp_dir) + self.title = self.source.title if self.source.title else self.audio_file[:-4] + return self.audio_file, tmp_dir + + def transcribe(self, working_dir, generate_chapters, summarize_transcript, service, diarize, upload, model_output_dir, test_transcript=None): + + def process_mp3(): + """using whisper""" + self.logger.info("Transcribing audio to text using whisper ...") + try: + my_model = whisper.load_model(service) + result = my_model.transcribe(self.audio_file) + data = [] + for x in result["segments"]: + data.append(tuple((x["start"], x["end"], x["text"]))) + data_path = application.generate_srt( + data, self.title, model_output_dir) + if upload: + application.upload_file_to_s3(data_path) + return data + except Exception as e: + self.logger.error( + f"(wisper,{service}) Error transcribing audio to text: {e}") + return + + def write_chapters_file(): + """Write out the chapter file based on simple MP4 format (OGM)""" + try: + if generate_chapters and len(self.source.chapters) > 0: + self.logger.info("Chapters detected") + chapters_file = os.path.join(working_dir, os.path.basename( + self.audio_file)[:-4] + ".chapters") + + with open(chapters_file, "w") as fo: + for current_chapter in self.source.chapters: + fo.write( + f"CHAPTER{current_chapter[0]}=" + f"{current_chapter[1]}\n" + f"CHAPTER{current_chapter[0]}NAME=" + f"{current_chapter[2]}\n" + ) + fo.close() + return True + else: + return False + except Exception as e: + raise Exception(f"Error writing chapters file: {e}") + + try: + self.summary = None + if self.test_mode: + self.result = test_transcript if test_transcript is not None else "test-mode" + return self.result + if not self.audio_file: + # TODO give audio file path as argument + raise Exception( + "audio file is missing, you need to process_source() first") + + has_chapters = write_chapters_file() + self.result = None + if service == "deepgram" or summarize_transcript: + deepgram_resp = application.process_mp3_deepgram( + self.audio_file, summarize_transcript, diarize) + self.result = application.get_deepgram_transcript( + deepgram_resp, diarize, self.title, upload, model_output_dir) + + if summarize_transcript: + self.summary = application.get_deepgram_summary( + deepgram_resp) + + if service == "deepgram" and has_chapters: + if diarize: + self.result = application.combine_deepgram_chapters_with_diarization( + deepgram_data=deepgram_resp, chapters=self.source.chapters + ) + else: + self.result = application.combine_deepgram_with_chapters( + deepgram_data=deepgram_resp, chapters=self.source.chapters + ) + + if not service == "deepgram": + # whisper + self.result = process_mp3() + if has_chapters: + # this is only available for videos, for now + self.result = application.combine_chapter( + chapters=self.source.chapters, + transcript=self.result, + working_dir=working_dir + ) + else: + # finalize transcript + self.result = self.create_transcript() + + return self.result + + except Exception as e: + raise Exception(f"Error while transcribing audio source: {e}") + + def write_to_file(self, working_dir, transcript_by): + """Writes transcript to a markdown file and returns its path + This file is submitted as part of the Pull Request to the + bitcointranscripts repo + """ + + def process_metadata(key, value): + if value: + value = value.strip() + value = [item.strip() for item in value.split(",")] + return f"{key}: {value}\n" + return "" + + self.logger.info("Creating markdown file with transcription...") + try: + # Add metadata prefix + meta_data = ( + "---\n" + f"title: {self.title}\n" + f"transcript_by: {transcript_by} via TBTBTC v{__version__}\n" + ) + if not self.source.local: + meta_data += f"media: {self.source.source_file}\n" + meta_data += process_metadata("tags", self.source.tags) + meta_data += process_metadata("speakers", self.source.speakers) + meta_data += process_metadata("categories", + self.source.category) + if self.summary: + meta_data += f"summary: {self.summary}\n" + if self.source.event_date: + meta_data += f"date: {self.source.event_date}\n" + meta_data += "---\n" + # Write to file + file_name = self.title.replace(" ", "-") + file_name_with_ext = os.path.join( + working_dir, file_name + ".md") + with open(file_name_with_ext, "a") as opf: + opf.write(meta_data + "\n") + opf.write(self.result + "\n") + opf.close() + return os.path.abspath(file_name_with_ext) + except Exception as e: + self.logger.error(f"Error writing to file: {e}") + + +class Source: + def __init__(self, source_file, local, title, date, tags, category, speakers): + self.source_file = source_file + self.local = local + self.__config_event_date(date) + self.title = title + self.tags = tags + self.category = category + self.speakers = speakers + self.logger = logging.getLogger(__app_name__) + + def __config_event_date(self, date): + self.event_date = None + if date: + try: + self.event_date = datetime.strptime(date, "%Y-%m-%d").date() + except ValueError as e: + raise ValueError(f"Supplied date is invalid: {e}") + return + + def initialize(self): + try: + # FFMPEG installed on first use. + self.logger.debug("Initializing FFMPEG...") + static_ffmpeg.add_paths() + self.logger.debug("Initialized FFMPEG") + except Exception as e: + raise Exception("Error initializing") + + +class Audio(Source): + def __init__(self, source_file, local, title, date, tags, category, speakers): + super().__init__(source_file, local, title, date, tags, category, speakers) + self.type = "audio" + + def process(self, working_dir): + """Process audio""" + + def download_audio(): + """Helper method to download an audio file""" + if self.local: + raise Exception(f"{self.source_file} is a local file") + if self.title is None: + raise Exception("Please supply a title for the audio file") + self.logger.info(f"Downloading audio file: {self.source_file}") + try: + audio = requests.get(self.source_file, stream=True) + with open(os.path.join(working_dir, self.title + ".mp3"), "wb") as f: + total_length = int(audio.headers.get("content-length")) + for chunk in progress.bar( + audio.iter_content(chunk_size=1024), + expected_size=(total_length / 1024) + 1, + ): + if chunk: + f.write(chunk) + f.flush() + filename = os.path.join(working_dir, self.title + ".mp3") + return os.path.abspath(filename) + except Exception as e: + raise Exception(f"Error downloading audio file: {e}") + + try: + self.logger.info(f"Audio file detected: '{self.title}'") + if not self.local: + # download audio file from the internet + filename = download_audio() + abs_path = os.path.abspath(path=filename) + self.logger.info(f"Audio file stored in: {abs_path}") + else: + # calculate the absolute path of the local audio file + filename = self.source_file.split("/")[-1] + abs_path = os.path.abspath(self.source_file) + self.logger.info(f"Processing audio file: {abs_path}") + if filename.endswith("wav"): + self.initialize() + abs_path = application.convert_wav_to_mp3( + abs_path=abs_path, filename=filename, working_dir=working_dir + ) + # return the audio file that is now ready for transcription + return abs_path + + except Exception as e: + raise Exception(f"Error processing audio file: {e}") + + +class Video(Source): + def __init__(self, source_file, local, title, date, tags, category, speakers): + super().__init__(source_file, local, title, date, tags, category, speakers) + self.type = "video" + self.__config_source() + + def __config_source(self): + if not self.local: + # calculate youtube url + video_id = self.source_file # user gave just the youtube video id + if "watch?v=" in self.source_file: + parsed_url = urlparse(self.source_file) + video_id = parse_qs(parsed_url.query)["v"][0] + elif "youtu.be" in self.source_file or "embed" in self.source_file: + video_id = self.source_file.split("/")[-1] + self.source_file = "https://www.youtube.com/watch?v=" + video_id + if self.event_date is None: + # get the date from the published youtube video + video = pytube.YouTube(self.source_file) + self.event_date = str(video.publish_date).split(" ")[0] + + def process(self, working_dir): + """Process video""" + + def download_video(): + """Helper method to download a YouTube video""" + if self.local: + raise Exception(f"{self.source_file} is a local file") + try: + # download video from youtube + self.logger.info(f"Downloading video: {self.source_file}") + + ydl_opts = { + "format": "18", + "outtmpl": os.path.join(working_dir, "videoFile.%(ext)s"), + "nopart": True, + "writeinfojson": True, + } + with yt_dlp.YoutubeDL(ydl_opts) as ytdl: + ytdl.download([self.source_file]) + + with open(os.path.join(working_dir, "videoFile.info.json")) as file: + info = ytdl.sanitize_info(json.load(file)) + name = info["title"].replace("/", "-") + file.close() + + os.rename( + os.path.join(working_dir, "videoFile.mp4"), + os.path.join(working_dir, name + ".mp4"), + ) + + return os.path.abspath(os.path.join(working_dir, name + ".mp4")) + except Exception as e: + shutil.rmtree(working_dir) + raise Exception(f"Error downloading video: {e}") + + def convert_video_to_mp3(video_file): + try: + clip = VideoFileClip(video_file) + self.logger.info(f"Converting {video_file} to mp3...") + clip.audio.write_audiofile( + os.path.join(working_dir, video_file.split("/") + [-1][:-4] + ".mp3") + ) + clip.close() + self.logger.info("Video converted to mp3") + return os.path.join(working_dir, video_file.split("/")[-1][:-4] + ".mp3") + except Exception as e: + raise Exception(f"Error converting video to mp3: {e}") + + def extract_chapters_from_downloaded_video_metadata(): + try: + list_of_chapters = [] + with open(f"{working_dir}/videoFile.info.json", "r") as f: + info = json.load(f) + if "chapters" not in info: + self.logger.info("No chapters found for downloaded video") + return list_of_chapters + for index, x in enumerate(info["chapters"]): + name = x["title"] + start = x["start_time"] + list_of_chapters.append((str(index), start, str(name))) + + return list_of_chapters + except Exception as e: + self.logger.error( + f"Error reading downloaded video's metadata: {e}") + return [] + + try: + if not self.local: + abs_path = download_video() + self.chapters = extract_chapters_from_downloaded_video_metadata() + else: + abs_path = os.path.abspath(self.source_file) + + self.initialize() + audio_file = convert_video_to_mp3(abs_path) + return audio_file + + except Exception as e: + raise Exception(f"Error processing video file: {e}") + + +class Playlist(Source): + def __init__(self, source_file, local, title, date, tags, category, speakers): + super().__init__(source_file, local, title, date, tags, category, speakers) + self.type = "playlist" + self.videos = [] + self.__config_source() + + def __config_source(self): + if not self.local: + # calculate youtube url + playlist_id = self.source_file # user gave just the youtube playlist id + if self.source_file.startswith("http") or self.source_file.startswith("www"): + parsed_url = urlparse(self.source_file) + playlist_id = parse_qs(parsed_url.query)["list"][0] + url = "https://www.youtube.com/playlist?list=" + playlist_id + videos = pytube.Playlist(url) + if videos is None: + raise Exception(f"Playlist '{url}' is empty") + for source_file in videos: + source = Video(source_file, self.local, self.title, + self.event_date, self.tags, self.category, self.speakers) + self.videos.append(source) diff --git a/app/transcription.py b/app/transcription.py new file mode 100644 index 0000000..e8f16eb --- /dev/null +++ b/app/transcription.py @@ -0,0 +1,233 @@ +import logging +import os +import re +import tempfile +import time +from datetime import datetime + +from dotenv import dotenv_values +import pytube +from pytube.exceptions import PytubeError +import requests + +from app.transcript import Transcript, Audio, Video, Playlist, RSS +from app import __app_name__, __version__, application + + +class Transcription: + def __init__(self, loc="test/test", model="tiny", chapters=False, pr=False, summarize=False, deepgram=False, diarize=False, upload=False, model_output_dir="local_models/", nocleanup=False, queue=True, markdown=False, username=None, test_mode=False, working_dir=None): + self.model = model + self.transcript_by = "username" if test_mode else self.__get_username() + # location in the bitcointranscripts hierarchy + self.loc = loc.strip("/") + self.generate_chapters = chapters + self.open_pr = pr + self.summarize_transcript = summarize + self.service = "deepgram" if deepgram else model + self.diarize = diarize + self.upload = upload + self.model_output_dir = model_output_dir + self.transcripts = [] + self.nocleanup = nocleanup + # during testing we do not have/need a queuer backend + self.queue = queue if not test_mode else False + # during testing we need to create the markdown for validation purposes + self.markdown = markdown or test_mode + self.test_mode = test_mode + self.logger = logging.getLogger(__app_name__) + self.tmp_dir = working_dir if working_dir is not None else tempfile.mkdtemp() + + self.logger.info(f"Temp directory: {self.tmp_dir}") + + def _create_subdirectory(self, subdir_name): + """Helper method to create subdirectories within the central temp director""" + subdir_path = os.path.join(self.tmp_dir, subdir_name) + os.makedirs(subdir_path) + return subdir_path + + def __get_username(self): + try: + if os.path.isfile(".username"): + with open(".username", "r") as f: + username = f.read() + f.close() + else: + print("What is your github username?") + username = input() + with open(".username", "w") as f: + f.write(username) + f.close() + return username + except Exception as e: + raise Exception("Error getting username") + + def _calculate_source_type(self, source): + def check_if_playlist(media): + """Helper function to check if a source is a playlist + based on its file name""" + try: + if ( + media.startswith("PL") + or media.startswith("UU") + or media.startswith("FL") + or media.startswith("RD") + ): + return True + playlists = list(pytube.Playlist(media).video_urls) + if type(playlists) is not list: + return False + return True + except Exception as e: + return False + + def check_if_video(media): + """Helper function to check if a source is a video + based on its file name""" + if media.endswith(".mp4"): + return True + if re.search(r"^([\dA-Za-z_-]{11})$", media): + return True + try: + pytube.YouTube(media) + return True + except PytubeError as e: + raise Exception(f"Pytube Error: {e}") + + if source.endswith(".mp3") or source.endswith(".wav"): + return "audio" + elif check_if_playlist(source): + return "playlist" + elif check_if_video(source): + return "video" + else: + raise Exception(f"Invalid source: {source}") + + def add_transcription_source(self, source_file, title=None, date=None, tags=None, category=None, speakers=None): + """Calculates the type of the source based on its file name + """ + + # check if source is a local file + local = False + if os.path.isfile(source_file): + local = True + # initialize source + source_type = self._calculate_source_type(source_file) + if source_type == "audio": + source = Audio(source_file, local, title, + date, tags, category, speakers) + self.transcripts.append(Transcript(source, self.test_mode)) + elif source_type == "playlist": + source = Playlist(source_file, local, title, + date, tags, category, speakers) + for video in source.videos: + self.transcripts.append(Transcript(video, self.test_mode)) + elif source_type == "video": + source = Video(source_file, local, title, + date, tags, category, speakers) + self.transcripts.append(Transcript(source, self.test_mode)) + else: + raise Exception(f"Invalid source: {source_file}") + + def push_to_queue(self, transcript: Transcript, payload=None): + """Push the resulting transcript to a Queuer backend""" + def write_to_json(json_data): + time_in_str = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + file_path = os.path.join( + self.model_output_dir, f"{transcript.title}_{time_in_str}_payload.json" + ) + with open(file_path, "w") as json_file: + json.dump(json_data, json_file, indent=4) + return file_path + try: + if payload is None: + # No payload has been given directly + # Construct the payload with the resulting transcript + payload = { + "content": { + "title": transcript.title, + "transcript_by": f"{self.transcript_by} via TBTBTC v{__version__}", + "categories": transcript.source.category, + "tags": transcript.source.tags, + "speakers": transcript.source.speakers, + "loc": self.loc, + "body": transcript.result, + } + } + # Handle optional metadata fields + if transcript.source.event_date: + payload["content"]["date"] = transcript.source.event_date if type( + transcript.source.event_date) is str else transcript.source.event_date.strftime("%Y-%m-%d") + if not transcript.source.local: + payload["content"]["media"] = transcript.source.source_file + # Check if the user opt-out from sending the payload to the Queuer + if not self.queue: + # payload will not be send to the Queuer backend + if self.test_mode: + # queuer is disabled by default when testing but we still + # return the payload to be used for testing purposes + return payload + else: + # store payload in case the user wants to manually send it to the queuer + payload_json_file = write_to_json(payload) + self.logger.info( + f"Transcript not added to the queue, payload stored at: {payload_json_file}") + return payload_json_file + # Push the payload with the resulting transcript to the Queuer backend + config = dotenv_values(".env") + if "QUEUE_ENDPOINT" not in config: + raise Exception( + "To push to a queue you need to define a 'QUEUE_ENDPOINT' in your .env file") + if "BEARER_TOKEN" not in config: + raise Exception( + "To push to a queue you need to define a 'BEARER_TOKEN' in your .env file") + url = config["QUEUE_ENDPOINT"] + "/api/transcripts" + headers = { + 'Authorization': f'Bearer {config["BEARER_TOKEN"]}', + 'Content-Type': 'application/json' + } + resp = requests.post(url, json=payload, headers=headers) + if resp.status_code == 200: + self.logger.info("Transcript added to queue") + return resp + except Exception as e: + self.logger.error(f"Transcript not added to queue: {e}") + + def start(self, test_transcript=None): + self.result = [] + try: + for transcript in self.transcripts: + tmp_dir = self._create_subdirectory( + f"transcript{len(self.result) + 1}") + audio_file, _ = transcript.process_source(tmp_dir) + result = transcript.transcribe( + tmp_dir, + self.generate_chapters, + self.summarize_transcript, + self.service, + self.diarize, + self.upload, + self.model_output_dir, + test_transcript=test_transcript + ) + if self.markdown: + transcription_md_file = transcript.write_to_file( + self.model_output_dir if not self.test_mode else tmp_dir, + self.transcript_by) + if self.open_pr: + application.create_pr( + absolute_path=transcription_md_file, + loc=self.loc, + username=self.transcript_by, + curr_time=str(round(time.time() * 1000)), + title=transcript.title, + ) + else: + self.push_to_queue(transcript) + self.result.append(transcription_md_file) + return self.result + except Exception as e: + raise Exception(f"Error with the transcription: {e}") from e + + def clean_up(self): + self.logger.info("Cleaning up...") + application.clean_up(self.tmp_dir) diff --git a/transcriber.py b/transcriber.py index 063d962..d67ab01 100644 --- a/transcriber.py +++ b/transcriber.py @@ -1,9 +1,11 @@ import logging -from datetime import datetime +import tempfile import click from app import __app_name__, __version__, application +from app.transcript import Transcript +from app.transcription import Transcription def setup_logger(): @@ -110,21 +112,12 @@ def print_help(ctx, param, value): default=False, help="Supply this flag if you want to generate chapters for the transcript", ) -@click.option( - "-h", - "--help", - is_flag=True, - callback=print_help, - expose_value=False, - is_eager=True, - help="Show the application's help and exit.", -) @click.option( "-p", "--PR", is_flag=True, default=False, - help="Supply this flag if you want to generate a payload", + help="Supply this flag if you want to open a PR at the bitcointranscripts repo", ) @click.option( "-D", @@ -171,6 +164,24 @@ def print_help(ctx, param, value): help="Supply this flag if you want to upload processed model files to AWS " "S3", ) +@click.option( + "--nocleanup", + is_flag=True, + default=False, + help="Do not remove temp files on exit", +) +@click.option( + "--noqueue", + is_flag=True, + default=False, + help="Do not push the resulting transcript to the Queuer backend", +) +@click.option( + "--markdown", + is_flag=True, + default=False, + help="Create a markdown file for the resulting transcript", +) def add( source: str, loc: str, @@ -188,6 +199,9 @@ def add( upload: bool, verbose: bool, model_output_dir: str, + nocleanup: bool, + noqueue: bool, + markdown: bool ) -> None: """Supply a YouTube video id and directory for transcription. \n Note: The https links need to be wrapped in quotes when running the command @@ -199,51 +213,36 @@ def add( logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.WARNING) + tmp_dir = tempfile.mkdtemp() logger.info( "This tool will convert Youtube videos to mp3 files and then " "transcribe them to text using Whisper. " ) try: - username = application.get_username() - loc = loc.strip("/") - event_date = None - if date: - try: - event_date = datetime.strptime(date, "%Y-%m-%d").date() - except ValueError as e: - logger.error("Supplied date is invalid: ", e) - return - (source_type, local) = application.check_source_type(source=source) - if source_type is None: - logger.error("Invalid source") - return - filename, tmp_dir = application.process_source( - source=source, - title=title, - event_date=event_date, - tags=tags, - category=category, - speakers=speakers, + transcription = Transcription( loc=loc, model=model, - username=username, chapters=chapters, pr=pr, summarize=summarize, - source_type=source_type, deepgram=deepgram, diarize=diarize, upload=upload, model_output_dir=model_output_dir, - verbose=verbose, - local=local + nocleanup=nocleanup, + queue=not noqueue, + markdown=markdown, + working_dir=tmp_dir + ) + transcription.add_transcription_source( + source_file=source, title=title, date=date, tags=tags, category=category, speakers=speakers, ) - if filename: - """INITIALIZE GIT AND OPEN A PR""" - logger.info("Transcription complete") - logger.info("Cleaning up...") - application.clean_up(tmp_dir) + transcription.start() + if nocleanup: + logger.info("Not cleaning up temp files...") + else: + transcription.clean_up() except Exception as e: logger.error(e) - logger.error("Cleaning up...") + logger.info(f"Exited with error, not cleaning up temp files: {tmp_dir}")