diff --git a/images/examples/voice_agent/daily-api-key.png b/images/examples/voice_agent/daily-api-key.png new file mode 100644 index 00000000..e0ad119d Binary files /dev/null and b/images/examples/voice_agent/daily-api-key.png differ diff --git a/mint.json b/mint.json index 96cf3cb6..4262e53d 100644 --- a/mint.json +++ b/mint.json @@ -151,7 +151,8 @@ "v4/examples/sdxl", "v4/examples/mistral-vllm", "v4/examples/streaming-falcon-7B", - "v4/examples/transcribe-whisper" + "v4/examples/transcribe-whisper", + "v4/examples/aiVoiceAgents" ] } ], diff --git a/v4/examples/aiVoiceAgents.mdx b/v4/examples/aiVoiceAgents.mdx new file mode 100644 index 00000000..77a60088 --- /dev/null +++ b/v4/examples/aiVoiceAgents.mdx @@ -0,0 +1,357 @@ +--- +title: "Real-time Voice Agent" +description: "Deploy a real-time AI voice agent" +--- + +In this tutorial, I am going to create a real-time voice agent that can respond to any query via speech, in speech, in ~500ms. This is an extremely flexible implementation where you have the +ability to swap in a any LLM's or Text-to-speech (TTS) model of your liking. This is extremely useful for use cases involving voice such as customer support bots, receptionists and many more. + +In order to create this application, we use the [PipeCat](https://www.pipecat.ai/) framework that takes care of stringing together all these components as well as handles some of the functionality we +might need such as handling user interuptions, dealing with audio data etc. We will show this functionality by joining a meeting room with our voice agent using [Daily](https://daily.co) (the creators of Pipecat) and will deploy this application on Cerebrium to show how it handles deploying and scaling our application seamlessly. + +You can find the final version of the code [here](TODO) + +### Cerebrium setup + +If you don’t have a Cerebrium account, you can create one by signing up [here](https://dashboard.cerebrium.ai/register) and following the documentation [here](https://docs.cerebrium.ai/cerebrium/getting-started/installation) to get setup + +In your IDE, run the following command to create our Cerebrium starter project: `cerebrium init agent-tool-calling`. This creates two files: + +- **Main.py** - Our entrypoint file where our code lives +- **cerebrium.toml** - A configuration file that contains all our build and environment settings + ‍ +Add the following pip packages near the bottom of your cerebrium.toml. This will be used in creating our deployment environment. + +``` +[cerebrium.deployment] +docker_base_image_url = "registry.cerebrium.ai/daily:latest" + +[cerebrium.dependencies.pip] +torch = ">=2.0.0" +"pipecat-ai[silero, daily, openai, deepgram]" = "latest" +aiohttp = "latest" +torchaudio = "latest" +vllm = "latest" +huggingface_hub = "latest" +``` + +You will also see we specify a Docker base image above. The reason for this is Daily has supplied a Docker image that contains local [Deepgram](https://deepgram.com/) Speech-to-Text (STT) and Text-to-Speech (TTS) models. +This helps us achieve our low latency since everything is running locally and not going over the network. + +Docker files are not support yet but are rather in the works to be released soon. This is just a very early preview of how it would work. + +In this example we will be using Llama 3 8B as our LLM and serving it via vLLM. In order to use Llama 3, we need to be authenticated via Hugging Face. + +To authenticate ourselves we need to go to HuggingFace and accept the model permissions for [Lllama 8B](https://huggingface.co/meta-llama/Meta-Llama-3-8B-Instruct) if we haven’t already. It takes about 30 minutes or less for them to accept your request. + +In your Cerebrium dashboard you can add your HuggingFace token as a secret by navigating to “Secrets” in the sidebar. For the sake of this tutorial I called mine “HF_TOKEN”. We can now access these values in our code at runtime without exposing them in our code. + +You can then add the following code to your main.py: + +```python +from huggingface_hub import login +import subprocess + +login(token=get_secret('HF_TOKEN')) +login(token=get_secret('HF_TOKEN')) +# Run vllM Server in background process +def start_server(): + while True: + process = subprocess.Popen( + f"subprocess.Popen(f"python -m vllm.entrypoints.openai.api_server --port 5000 --model meta-llama/Meta-Llama-3-8B-Instruct --dtype bfloat16 --api-key {get_secret('HF_TOKEN')} --download-dir /persistent-storage/", shell=True)", + shell=True + ) + process.wait() # Wait for the process to complete + logger.error("Server process ended unexpectedly. Restarting in 5 seconds...") + time.sleep(5) # Wait before restarting + +# Start the server in a separate process +server_process = Process(target=start_server) +server_process.start() +``` + +Pipecat does not currently support locally instantiated models and requires them to follow the OpenAI compatible format. Therefore we run the vLLM server locally on our instance in a background process. +We run monitor the bacground process to make sure it launched successfully since there seems to be a bug with rapidly starting multiple vLLM instances. If it doesn't launch correctly, we wait 5 seconds before trying again. +Note, we are running the vLLM server on port 5000 (8000 is automatically used by Cerebrium) and we set the download directory of the model so that subsequent cold starts can be much quicker. + + +Now we implement the Pipecat framework by instantiating the various components. Create a function call main with the following code: +```python +import aiohttp +import os +import sys +import subprocess +import time +import requests +import asyncio +from multiprocessing import Process +from loguru import logger + +from pipecat.vad.vad_analyzer import VADParams +from pipecat.vad.silero import SileroVADAnalyzer +from pipecat.transports.services.daily import DailyParams, DailyTransport +from pipecat.services.openai import OpenAILLMService +from pipecat.services.deepgram import DeepgramSTTService +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.pipeline import Pipeline +from pipecat.frames.frames import LLMMessagesFrame, EndFrame + +from pipecat.processors.aggregators.llm_response import ( + LLMAssistantResponseAggregator, LLMUserResponseAggregator +) + +from helpers import ( + ClearableDeepgramTTSService, + AudioVolumeTimer, + TranscriptionTimingLogger +) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + +deepgram_voice: str = "aura-asteria-en" + +async def main(room_url: str, token: str = None): + + async with aiohttp.ClientSession() as session: + transport = DailyTransport( + room_url, + token if token else get_secret("DAILY_TOKEN"), + "Respond bots", + DailyParams( + audio_out_enabled=True, + transcription_enabled=False, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams( + stop_secs=0.2 + )), + vad_audio_passthrough=True + ) + ) + + stt = DeepgramSTTService( + name="STT", + api_key=None, + url='ws://127.0.0.1:8082/v1/listen' + ) + + tts = ClearableDeepgramTTSService( + name="Voice", + aiohttp_session=session, + api_key=None, + voice=deepgram_voice, + base_url="http://127.0.0.1:8082/v1/speak" + ) + + llm = OpenAILLMService( + name="LLM", + api_key=get_secret("HF_TOKEN"), + model="casperhansen/llama-3-8b-instruct-awq", + base_url="http://0.0.0.0:5000/v1" + ) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + avt = AudioVolumeTimer() + tl = TranscriptionTimingLogger(avt) + + tma_in = LLMUserResponseAggregator(messages) + tma_out = LLMAssistantResponseAggregator(messages) + + pipeline = Pipeline([ + transport.input(), # Transport user input + avt, # Audio volume timer + stt, # Speech-to-text + tl, # Transcription timing logger + tma_in, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + tma_out, # Assistant spoken responses + ]) + + task = PipelineTask( + pipeline, + PipelineParams( + allow_interruptions=True, + enable_metrics=True, + report_only_initial_ttfb=True + )) +``` + +First, in our main function we initialize the daily transfort layer in order to receive/send the audio/video data from the Daily room we will connect to. You can see we pass the room_url +we would like to join as well as a token to authenticate us programatically joining. We also set our VAD stop seconds which is the amount of time we wait for a pause before our bot will respond - in this +example we set it to 200 milliseconds. + +Next we connect to our locally running Deepgram models that come part of our Docker base image we specified in our cerebrium.toml - these are running on port 8082. This is where the Pipecat framework helps convert audio data to text +and vice versa. We then follow the same patten to connect our locally running LLM model from our vLLM server. + +Lastly, we then put this all together as a PipelineTask which is what Pipecat runs all together. The make up of a task is completely customisable and has support for Image and Vision use cases. You can read more [here](https://docs.pipecat.ai/docs/category/services). +Pipeline tasks come with a structure and parameters that make it easy to handle interruptions out the box and we are able to swap models to our preference only changing a few lines of code. + +The Daily Python SDK comes with a lot of event webhooks where you can trigger functionality based on events occurring. So let us handle how our bot handles certain events such as a user leaving/joining a call. +Continue to add the following code to the main() function. + +```python +# When the first participant joins, the bot should introduce itself. +@transport.event_handler("on_first_participant_joined") +async def on_first_participant_joined(transport, participant): + # Kick off the conversation. + messages.append( + {"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frame(LLMMessagesFrame(messages)) + +# When a participant joins, start transcription for that participant so the +# bot can "hear" and respond to them. +@transport.event_handler("on_participant_joined") +async def on_participant_joined(transport, participant): + transport.capture_participant_transcription(participant["id"]) + +# When the participant leaves, we exit the bot. +@transport.event_handler("on_participant_left") +async def on_participant_left(transport, participant, reason): + await task.queue_frame(EndFrame()) + +# If the call is ended make sure we quit as well. +@transport.event_handler("on_call_state_updated") +async def on_call_state_updated(transport, state): + if state == "left": + await task.queue_frame(EndFrame()) + +runner = PipelineRunner() + +await runner.run(task) +await session.close() +``` + +Above we handle the following events: + +- When the first participant joins, we get the bot to introduce itself to the user. We do this by adding a message to the conversation. +- We add support for multiple participants to join and listen/respond to the bot. +- When a participant leaves or the call is ended, we get the bot to terminate itself. + +From the code above, you will see the events are attached to Transport, which is the method of communication - in this case the meeting room. We then pass in our defined Pipeline task +to our pipeline runner which executre indefinitely until we signal it to exit which in this case happens when a call ends. If you want to read further about the PipeCat infrastructure +you can read more [here](https://docs.pipecat.ai/docs/understanding-bots/dailyai-architecture) + +The above code needs to be run in a seperate execution environment so PipeCat does not get instantiate multiple instances. In order to do this, we need to run the above code as a +background process. This will be the entry point of our REST API endpoint to start the PipeCat bot. Once the pipecat bot has returned (ie: the call has ended) then we will return a response to our API endpoint. +We therefore create the following function: + +Currently, Cerebrium does not support workloads running longer than 5 minutes however it is currently being worked on internally and will be released soon. This means that +conversations are limited to a 5 minute window. If this is a issue and you have a urgent use case, please reach out to [support](mailto:support@cerebrium.ai) + +```python +def start_bot(room_url: str, token: str = None): + + def target(): + asyncio.run(main(room_url, token)) + + check_model_status() + process = Process(target=target) + process.start() + process.join() # Wait for the process to complete + return {"message": "session finished"} +``` + +Thats it! You now have a fully functioning AI bot that can interact with a user through speech in ~500ms. Image the possibilities! + +Let us now create a user facing UI in order for you to interface with this bot. + +## Creating Meeting Room + +Cerebrium doesn't only have to be used to run AI heavy workloads, it can run any Python code. Therefore we define two functions for our demo that will create a room to join programatically +and a temporary token, both of which will only be usable for 5 minutes. In order to implement this, we use the Daily REST API. + +We need to get our Daily developer token from our profile. If, you don't have an account you can sign up for one [here](https://dashboard.daily.co/u/signup) (they have a generous free tier). +You can then go to the "developers" tab in order to fetch your API key - add this to your Cerebrium Secrets. + +![Daily API Key](/images/examples/voice_agent/daily-api-key.png) + +Below we create a room that only lasts 5 minutes and a temporary token to access it + +```python +def create_room(): + url = "https://api.daily.co/v1/rooms/" + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {get_secret('DAILY_TOKEN')}" + } + data = { + "properties": { + "exp": int(time.time()) + 60*5 ##5 mins + } + } + + response = requests.post(url, headers=headers, json=data) + if response.status_code == 200: + room_info = response.json() + token = create_token(room_info['name']) + if token and 'token' in token: + room_info['token'] = token['token'] + else: + logger.error("Failed to create token") + return {"message": 'There was an error creating your room', "status_code": 500} + return room_info + else: + logger.error(f"Failed to create room: {response.status_code}") + return {"message": 'There was an error creating your room', "status_code": 500} + +def create_token(room_name: str): + url = "https://api.daily.co/v1/meeting-tokens" + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {get_secret('DAILY_TOKEN')}" + } + data = { + "properties": { + "room_name": room_name + } + } + + response = requests.post(url, headers=headers, json=data) + if response.status_code == 200: + token_info = response.json() + return token_info + else: + logger.error(f"Failed to create token: {response.status_code}") + return None +``` + +### Deploy to Cerebrium + +To deploy this application to Cerebrium you can simply run the command: cerebrium deploy in your terminal. + +If it deployed successfully, you should see something like this: + +![Cerebrium Deployment](/images/examples/langchain_langsmith/cerebrium_deploy.png) + +We will add these endpoints to our frontend interface. + +## Connect frontend + +We created a public fork of the PipeCat frontend in order to show you a nice demo of this application. You can clone the repo [here](https://github.com/CerebriumAI/web-client-ui). + +Follow the instructions in the README.md and then populate the following variables in your .env.development.local + +``` +VITE_SERVER_URL=https://api.cortex.cerebrium.ai/v4/p-xxxxx/ #This is the base url. Do not include the function names +VITE_SERVER_AUTH= #This is the JWT token you can get from the API Keys section of your Cerebrium Dashboard. +``` + +You can now run yarn dev and go to the url: http://localhost:5173/ to test your application! + +### Conclusion + +Hopefully this tutorial acts as a good starting point for you to implement voice into your application as well as extend it into image and vision capabilities. Pipecat is a +extensible and open-source framework that makes it easy to build applications like this and Cerebrium makes the process seamless to deploy and autoscale while only paying for the +compute you need. + + +Tag us as **@cerebrimai** so we can see what you build and please feel free to ask questions/send feedback to us on [Slack](https://join.slack.com/t/cerebriumworkspace/shared_invite/zt-1qojg3eac-q4xyu5O~MeniNIg2jNeadg) or [Discord](https://discord.gg/ATj6USmeE2) communities