diff --git a/app/application.py b/app/application.py index 57b6918..5f1c6c3 100644 --- a/app/application.py +++ b/app/application.py @@ -15,6 +15,8 @@ import time from dotenv import dotenv_values import yt_dlp +from deepgram import Deepgram +import mimetypes def download_video(url): @@ -55,19 +57,13 @@ def read_description(prefix): return list_of_chapters for index, x in enumerate(info['chapters']): name = x['title'] - start = x['start_time'] - m, s = divmod(start, 60) - h, m = divmod(m, 60) - current_dur = ':'.join([str(int(h)), str(int(m)), str(s)]) - start = current_dur - - list_of_chapters.append((str(index), str(start), str(name))) + list_of_chapters.append((str(index), start, str(name))) return list_of_chapters except Exception as e: print("Error reading description") - return list_of_chapters + return [] def write_chapters_file(chapter_file: str, chapter_list: list) -> None: @@ -85,33 +81,6 @@ def write_chapters_file(chapter_file: str, chapter_list: list) -> None: print(e) -def split_mp4(chapters: list, download_filename: str, download_name: str) -> None: - try: - current_duration_pretext = subprocess.run(['ffprobe', '-i', download_filename, - '-show_entries', 'format=duration', - '-v', 'quiet'], - capture_output=True, encoding='UTF8') - current_duration = float(current_duration_pretext.stdout[18:-13]) - m, s = divmod(current_duration, 60) - h, m = divmod(m, 60) - current_dur = ':'.join([str(int(h)), str(int(m)), str(s)]) - for current_index, current_chapter in enumerate(chapters): - # current_chapter will be a tuple: position, timecode, name - next_index = current_index + 1 - start_time = current_chapter[1] - try: - end_time = chapters[next_index][1] - except: - end_time = current_dur - output_name = f'{download_name} - ({current_index}).mp4' - subprocess.run(["ffmpeg", "-ss", start_time, "-to", end_time, - "-i", download_filename, "-acodec", "copy", - "-vcodec", "copy", output_name, "-loglevel", "quiet"]) - except Exception as e: - print("Error splitting mp4") - print(e) - - def convert_video_to_mp3(filename): try: clip = VideoFileClip(filename) @@ -165,6 +134,7 @@ def get_playlist_videos(url): print(e) return + def get_audio_file(url, title): print("URL: " + url) print("downloading audio file") @@ -184,19 +154,171 @@ def get_audio_file(url, title): def process_mp3(filename, model): - print("Transcribing audio to text...") + print("Transcribing audio to text using whisper ...") try: - mymodel = whisper.load_model(model) - result = mymodel.transcribe(filename[:-4] + ".mp3") - result = result["text"] + my_model = whisper.load_model(model) + result = my_model.transcribe(filename) + data = [] + for x in result["segments"]: + data.append(tuple((x["start"], x["end"], x["text"]))) print("Removed video and audio files") + return data + except Exception as e: + print("Error transcribing audio to text") + print(e) + return + + +def decimal_to_sexagesimal(dec): + sec = int(dec % 60) + minu = int((dec // 60) % 60) + hrs = int((dec // 60) // 60) + + return f'{hrs:02d}:{minu:02d}:{sec:02d}' + + +def combine_chapter(chapters, transcript): + try: + chapters_pointer = 0 + transcript_pointer = 0 + result = "" + # chapters index, start time, name + # transcript start time, end time, text + + while chapters_pointer < len(chapters) and transcript_pointer < len(transcript): + if chapters[chapters_pointer][1] <= transcript[transcript_pointer][0]: + result = result + "\n\n## " + chapters[chapters_pointer][2] + "\n\n" + chapters_pointer += 1 + else: + result = result + transcript[transcript_pointer][2] + transcript_pointer += 1 + + while transcript_pointer < len(transcript): + result = result + transcript[transcript_pointer][2] + transcript_pointer += 1 + + with open("result.md", "w") as file: + file.write(result) + return result + except Exception as e: + print("Error combining chapters") + print(e) + + +def combine_deepgram_chapters_with_diarization(deepgram_data, chapters): + try: + para = "" + string = "" + curr_speaker = None + words = deepgram_data["results"]["channels"][0]["alternatives"][0]["words"] + words_pointer = 0 + chapters_pointer = 0 + while chapters_pointer < len(chapters) and words_pointer < len(words): + if chapters[chapters_pointer][1] <= words[words_pointer]["start"]: + if para != "": + para = para.strip(" ") + string = string + para + "\n\n" + para = "" + string = string + f'## {chapters[chapters_pointer][2]}\n\n' + chapters_pointer += 1 + else: + if words[words_pointer]["speaker"] != curr_speaker: + if para != "": + para = para.strip(" ") + string = string + para + "\n\n" + para = "" + string = string + f'Speaker {words[words_pointer]["speaker"]}:' \ + f' {decimal_to_sexagesimal(words[words_pointer]["start"])}' + curr_speaker = words[words_pointer]["speaker"] + string = string + '\n\n' + + para = para + " " + words[words_pointer]["punctuated_word"] + words_pointer += 1 + while words_pointer < len(words): + if words[words_pointer]["speaker"] != curr_speaker: + if para != "": + para = para.strip(" ") + string = string + para + "\n\n" + para = "" + string = string + f'Speaker {words[words_pointer]["speaker"]}:' \ + f' {decimal_to_sexagesimal(words[words_pointer]["start"])}' + curr_speaker = words[words_pointer]["speaker"] + string = string + '\n\n' + + para = para + " " + words[words_pointer]["punctuated_word"] + words_pointer += 1 + para = para.strip(" ") + string = string + para + return string + except Exception as e: + print("Error combining deepgram chapters") + print(e) + + +def get_deepgram_transcript(deepgram_data, diarize): + if diarize: + para = "" + string = "" + curr_speaker = None + for word in deepgram_data["results"]["channels"][0]["alternatives"][0]["words"]: + if word["speaker"] != curr_speaker: + if para != "": + para = para.strip(" ") + string = string + para + "\n\n" + para = "" + string = string + f'Speaker {word["speaker"]}: {decimal_to_sexagesimal(word["start"])}' + curr_speaker = word["speaker"] + string = string + '\n\n' + + para = para + " " + word["punctuated_word"] + para = para.strip(" ") + string = string + para + return string + else: + return deepgram_data["results"]["channels"][0]["alternatives"][0]["transcript"] + + +def get_deepgram_summary(deepgram_data): + try: + summaries = deepgram_data["results"]["channels"][0]["alternatives"][0]["summaries"] + summary = "" + for x in summaries: + summary = summary + " " + x["summary"] + return summary.strip(" ") + except Exception as e: + print("Error getting summary") + print(e) + + +def process_mp3_deepgram(filename, summarize, diarize): + print("Transcribing audio to text using deepgram...") + try: + config = dotenv_values(".env") + dg_client = Deepgram(config["DEEPGRAM_API_KEY"]) + + with open(filename, "rb") as audio: + mimeType = mimetypes.MimeTypes().guess_type(filename)[0] + source = {'buffer': audio, 'mimetype': mimeType} + response = dg_client.transcription.sync_prerecorded(source, {'punctuate': True, 'speaker_labels': True, + 'diarize': diarize, 'smart_formatting': True, + 'summarize': summarize}) + audio.close() + return response except Exception as e: print("Error transcribing audio to text") print(e) return +def create_transcript(data): + result = "" + for x in data: + result = result + x[2] + " " + + return result + + def initialize(): try: print(''' @@ -211,7 +333,8 @@ def initialize(): print(e) -def write_to_file(result, loc, url, title, date, tags, category, speakers, video_title, username, local, test, pr): +def write_to_file(result, loc, url, title, date, tags, category, speakers, video_title, username, local, test, pr, + summary): try: transcribed_text = result if title: @@ -241,12 +364,14 @@ def write_to_file(result, loc, url, title, date, tags, category, speakers, video for i in range(len(category)): category[i] = category[i].strip() meta_data += f'categories: {category}\n' + if summary: + meta_data += f'summary: {summary}\n' file_name = video_title.replace(' ', '-') file_name_with_ext = "tmp/" + file_name + '.md' if date: - meta_data = meta_data + f'date: {date}\n' + meta_data += f'date: {date}\n' meta_data += '---\n' if test is not None or pr: @@ -265,12 +390,12 @@ def write_to_file(result, loc, url, title, date, tags, category, speakers, video print(e) -def get_md_file_path(result, loc, video, title, event_date, tags, category, speakers, username, local, video_title, test, - pr): +def get_md_file_path(result, loc, video, title, event_date, tags, category, speakers, username, local, video_title, + test, pr, summary=""): try: print("writing .md file") file_name_with_ext = write_to_file(result, loc, video, title, event_date, tags, category, speakers, video_title, - username, local, test, pr) + username, local, test, pr, summary) print("wrote .md file") absolute_path = os.path.abspath(file_name_with_ext) @@ -322,7 +447,7 @@ def check_source_type(source): def process_audio(source, title, event_date, tags, category, speakers, loc, model, username, local, - created_files, test, pr): + created_files, test, pr, deepgram, summarize, diarize): try: print("audio file detected") curr_time = str(round(time.time() * 1000)) @@ -332,9 +457,13 @@ def process_audio(source, title, event_date, tags, category, speakers, loc, mode print("Error: Please supply a title for the audio file") return None # process audio file + summary = None + result = None if not local: filename = get_audio_file(url=source, title=title) - abs_path = os.path.abspath(path=filename) + abs_path = os.path.abspath(path="tmp/" + filename) + print("filename", filename) + print("abs_path", abs_path) created_files.append(abs_path) else: filename = source.split("/")[-1] @@ -344,15 +473,23 @@ def process_audio(source, title, event_date, tags, category, speakers, loc, mode print("File not found") return if filename.endswith('wav'): + initialize() abs_path = convert_wav_to_mp3(abs_path=abs_path, filename=filename) created_files.append(abs_path) if test: result = test else: - result = process_mp3(abs_path, model) - absolute_path = get_md_file_path(result=result, loc=loc, video=source, title=title, event_date=event_date, tags=tags, - category=category, speakers=speakers, username=username, local=local, - video_title=filename[:-4], test=test, pr=pr) + if deepgram or summarize: + deepgram_resp = process_mp3_deepgram(filename=abs_path, summarize=summarize, diarize=diarize) + result = get_deepgram_transcript(deepgram_data=deepgram_resp, diarize=diarize) + if summarize: + summary = get_deepgram_summary(deepgram_data=deepgram_resp) + if not deepgram: + result = process_mp3(abs_path, model) + result = create_transcript(result) + absolute_path = get_md_file_path(result=result, loc=loc, video=source, title=title, event_date=event_date, + tags=tags, category=category, speakers=speakers, username=username, + local=local, video_title=filename[:-4], test=test, pr=pr, summary=summary) created_files.append(absolute_path) if pr: @@ -366,7 +503,7 @@ def process_audio(source, title, event_date, tags, category, speakers, loc, mode def process_videos(source, title, event_date, tags, category, speakers, loc, model, username, created_files, - chapters, pr): + chapters, pr, deepgram, summarize, diarize): try: print("Playlist detected") if source.startswith("http") or source.startswith("www"): @@ -385,7 +522,8 @@ def process_videos(source, title, event_date, tags, category, speakers, loc, mod for video in videos: filename = process_video(video=video, title=title, event_date=event_date, tags=tags, category=category, speakers=speakers, loc=loc, model=selected_model, username=username, - pr=pr, created_files=created_files, chapters=chapters, test=False) + pr=pr, created_files=created_files, chapters=chapters, test=False, diarize=diarize, + deepgram=deepgram, summarize=summarize) if filename is None: return None return filename @@ -394,10 +532,39 @@ def process_videos(source, title, event_date, tags, category, speakers, loc, mod print(e) -def process_video(video, title, event_date, tags, category, speakers, loc, model, username, created_files, - chapters, test, pr, local=False): +def combine_deepgram_with_chapters(deepgram_data, chapters): try: + chapters_pointer = 0 + words_pointer = 0 result = "" + words = deepgram_data["results"]["channels"][0]["alternatives"][0]["words"] + # chapters index, start time, name + # transcript start time, end time, text + while chapters_pointer < len(chapters) and words_pointer < len(words): + if chapters[chapters_pointer][1] <= words[words_pointer]["end"]: + result = result + "\n\n## " + chapters[chapters_pointer][2] + "\n\n" + chapters_pointer += 1 + else: + result = result + words[words_pointer]["punctuated_word"] + " " + words_pointer += 1 + + # Append the final chapter heading and remaining content + while chapters_pointer < len(chapters): + result = result + "\n\n## " + chapters[chapters_pointer][2] + "\n\n" + chapters_pointer += 1 + while words_pointer < len(words): + result = result + words[words_pointer]["punctuated_word"] + " " + words_pointer += 1 + + return result + except Exception as e: + print("Error combining deepgram with chapters") + print(e) + + +def process_video(video, title, event_date, tags, category, speakers, loc, model, username, created_files, + chapters, test, pr, local=False, deepgram=False, summarize=False, diarize=False): + try: curr_time = str(round(time.time() * 1000)) if not local: if "watch?v=" in video: @@ -422,51 +589,49 @@ def process_video(video, title, event_date, tags, category, speakers, loc, model print() print() + initialize() + summary = None + result = "" + deepgram_data = None if chapters and not test: chapters = read_description("tmp/") elif test: chapters = read_description("test/testAssets/") + convert_video_to_mp3(abs_path[:-4] + '.mp4') + if deepgram or summarize: + deepgram_data = process_mp3_deepgram(abs_path[:-4] + ".mp3", summarize=summarize, diarize=diarize) + result = get_deepgram_transcript(deepgram_data=deepgram_data, diarize=diarize) + if summarize: + print("Summarizing") + summary = get_deepgram_summary(deepgram_data=deepgram_data) + if not deepgram: + result = process_mp3(abs_path[:-4] + ".mp3", model) + created_files.append(abs_path[:-4] + ".mp3") if chapters and len(chapters) > 0: print("Chapters detected") write_chapters_file(abs_path[:-4] + '.chapters', chapters) created_files.append(abs_path[:-4] + '.chapters') - split_mp4(chapters=chapters, download_filename=abs_path, download_name=abs_path[:-4]) - initialize() - for current_index, chapter in enumerate(chapters): - print(f"Processing chapter {chapter[2]} {current_index + 1} of {len(chapters)}") - temp_filename = f'{abs_path[:-4]} - ({current_index}).mp4' - if not test: - file = convert_video_to_mp3(filename=temp_filename) - if file is None: - print("File not found") - return None - temp_res = process_mp3(filename=temp_filename, model=model) - created_files.append(temp_filename[:-4] + ".mp3") + if deepgram: + if diarize: + result = combine_deepgram_chapters_with_diarization(deepgram_data=deepgram_data, chapters=chapters) else: - temp_res = "" - created_files.append(temp_filename) - - if chapter[2].startswith(" None: """Supply a YouTube video id and directory for transcription. \n Note: The https links need to be wrapped in quotes when running the command on zsh @@ -77,8 +85,9 @@ def add( return filename = application.process_source(source=source, title=title, event_date=event_date, tags=tags, category=category, speakers=speakers, loc=loc, model=model, - username=username, chapters=chapters, pr=pr, - source_type=source_type, created_files=created_files) + username=username, chapters=chapters, pr=pr, summarize=summarize, + source_type=source_type, created_files=created_files, deepgram=deepgram, + diarize=diarize) if filename: """ INITIALIZE GIT AND OPEN A PR""" print("Transcription complete")