diff --git a/examples/pipelines/providers/anthropic_manifold_pipeline.py b/examples/pipelines/providers/anthropic_manifold_pipeline.py index 98b27d5a..058b0645 100644 --- a/examples/pipelines/providers/anthropic_manifold_pipeline.py +++ b/examples/pipelines/providers/anthropic_manifold_pipeline.py @@ -1,8 +1,8 @@ """ title: Anthropic Manifold Pipeline author: justinh-rahb, sriparashiva -date: 2024-06-20 -version: 1.4 +date: 2024-12-23 +version: 1.5 license: MIT description: A pipeline for generating text and processing images using the Anthropic API. requirements: requests, sseclient-py @@ -31,27 +31,49 @@ def __init__(self): self.valves = self.Valves( **{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "your-api-key-here")} ) - self.url = 'https://api.anthropic.com/v1/messages' + self.url = "https://api.anthropic.com/v1/messages" + self.pipelines = [] self.update_headers() + self.get_anthropic_models() def update_headers(self): self.headers = { - 'anthropic-version': '2023-06-01', - 'content-type': 'application/json', - 'x-api-key': self.valves.ANTHROPIC_API_KEY + "anthropic-version": "2023-06-01", + "content-type": "application/json", + "x-api-key": self.valves.ANTHROPIC_API_KEY, } def get_anthropic_models(self): - return [ - {"id": "claude-3-haiku-20240307", "name": "claude-3-haiku"}, - {"id": "claude-3-opus-20240229", "name": "claude-3-opus"}, - {"id": "claude-3-sonnet-20240229", "name": "claude-3-sonnet"}, - {"id": "claude-3-5-haiku-20241022", "name": "claude-3.5-haiku"}, - {"id": "claude-3-5-sonnet-20241022", "name": "claude-3.5-sonnet"}, - ] + if self.valves.ANTHROPIC_API_KEY: + try: + list_models = requests.get( + "https://api.anthropic.com/v1/models", + headers=self.headers, + ).json() + + models = list_models["data"] + self.pipelines = [ + { + "id": model["id"], + "name": model["display_name"], + } + for model in models + ] + except Exception as e: + print(f"Error: {e}") + self.pipelines = [ + { + "id": self.id, + "name": "Could not fetch models from Anthropic, please update the API Key in the valves.", + }, + ] + else: + self.pipelines = [] async def on_startup(self): print(f"on_startup:{__name__}") + self.update_headers() + self.get_anthropic_models() pass async def on_shutdown(self): @@ -60,9 +82,7 @@ async def on_shutdown(self): async def on_valves_updated(self): self.update_headers() - - def pipelines(self) -> List[dict]: - return self.get_anthropic_models() + self.get_anthropic_models() def process_image(self, image_data): if image_data["url"].startswith("data:image"): @@ -87,7 +107,7 @@ def pipe( ) -> Union[str, Generator, Iterator]: try: # Remove unnecessary keys - for key in ['user', 'chat_id', 'title']: + for key in ["user", "chat_id", "title"]: body.pop(key, None) system_message, messages = pop_system_message(messages) @@ -101,28 +121,40 @@ def pipe( if isinstance(message.get("content"), list): for item in message["content"]: if item["type"] == "text": - processed_content.append({"type": "text", "text": item["text"]}) + processed_content.append( + {"type": "text", "text": item["text"]} + ) elif item["type"] == "image_url": if image_count >= 5: - raise ValueError("Maximum of 5 images per API call exceeded") + raise ValueError( + "Maximum of 5 images per API call exceeded" + ) processed_image = self.process_image(item["image_url"]) processed_content.append(processed_image) if processed_image["source"]["type"] == "base64": - image_size = len(processed_image["source"]["data"]) * 3 / 4 + image_size = ( + len(processed_image["source"]["data"]) * 3 / 4 + ) else: image_size = 0 total_image_size += image_size if total_image_size > 100 * 1024 * 1024: - raise ValueError("Total size of images exceeds 100 MB limit") + raise ValueError( + "Total size of images exceeds 100 MB limit" + ) image_count += 1 else: - processed_content = [{"type": "text", "text": message.get("content", "")}] + processed_content = [ + {"type": "text", "text": message.get("content", "")} + ] - processed_messages.append({"role": message["role"], "content": processed_content}) + processed_messages.append( + {"role": message["role"], "content": processed_content} + ) # Prepare the payload payload = { @@ -145,7 +177,9 @@ def pipe( return f"Error: {e}" def stream_response(self, payload: dict) -> Generator: - response = requests.post(self.url, headers=self.headers, json=payload, stream=True) + response = requests.post( + self.url, headers=self.headers, json=payload, stream=True + ) if response.status_code == 200: client = sseclient.SSEClient(response) @@ -170,6 +204,8 @@ def get_completion(self, payload: dict) -> str: response = requests.post(self.url, headers=self.headers, json=payload) if response.status_code == 200: res = response.json() - return res["content"][0]["text"] if "content" in res and res["content"] else "" + return ( + res["content"][0]["text"] if "content" in res and res["content"] else "" + ) else: raise Exception(f"Error: {response.status_code} - {response.text}") diff --git a/examples/pipelines/providers/cloudflare_ai_pipeline.py b/examples/pipelines/providers/cloudflare_ai_pipeline.py index 3bbcadc2..29301c22 100644 --- a/examples/pipelines/providers/cloudflare_ai_pipeline.py +++ b/examples/pipelines/providers/cloudflare_ai_pipeline.py @@ -1,24 +1,20 @@ -from typing import List, Union, Generator, Iterator -from schemas import OpenAIChatMessage -from pydantic import BaseModel import os +from typing import Generator, Iterator, List, Union + import requests +from pydantic import BaseModel class Pipeline: class Valves(BaseModel): CLOUDFLARE_ACCOUNT_ID: str = "" CLOUDFLARE_API_KEY: str = "" - CLOUDFLARE_MODEL: str = "" pass def __init__(self): - # Optionally, you can set the id and name of the pipeline. - # Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline. - # The identifier must be unique across all pipelines. - # The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes. - # self.id = "openai_pipeline" - self.name = "Cloudlfare AI" + self.type = "manifold" + self.name = "Cloudflare/" + self.id = "cloudflare" self.valves = self.Valves( **{ "CLOUDFLARE_ACCOUNT_ID": os.getenv( @@ -28,17 +24,50 @@ def __init__(self): "CLOUDFLARE_API_KEY": os.getenv( "CLOUDFLARE_API_KEY", "your-cloudflare-api-key" ), - "CLOUDFLARE_MODEL": os.getenv( - "CLOUDFLARE_MODELS", - "@cf/meta/llama-3.1-8b-instruct", - ), } ) - pass + self.pipelines = [] + self.update_headers() + self.get_models() + + def update_headers(self): + self.headers = { + "Authorization": f"Bearer {self.valves.CLOUDFLARE_API_KEY}", + "content-type": "application/json", + } + + def get_models(self): + if self.valves.CLOUDFLARE_ACCOUNT_ID and self.valves.CLOUDFLARE_API_KEY: + try: + list_models = requests.get( + f"https://api.cloudflare.com/client/v4/accounts/{self.valves.CLOUDFLARE_ACCOUNT_ID}/ai/models/search?task=Text%20Generation", + headers=self.headers, + ).json() + + models = list_models["result"] + self.pipelines = [ + { + "id": model["name"].replace("/", "___"), + "name": model["name"].replace("/", "___").split("___")[-1], + } + for model in models + ] + except Exception as e: + print(f"Error: {e}") + self.pipelines = [ + { + "id": self.id, + "name": "Could not fetch models from Cloudflare, please update the API Key in the valves.", + }, + ] + else: + self.pipelines = [] async def on_startup(self): # This function is called when the server is started. print(f"on_startup:{__name__}") + self.update_headers() + self.get_models() pass async def on_shutdown(self): @@ -46,17 +75,20 @@ async def on_shutdown(self): print(f"on_shutdown:{__name__}") pass + async def on_valves_updated(self): + self.update_headers() + self.get_models() + def pipe( self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG. print(f"pipe:{__name__}") - headers = {} - headers["Authorization"] = f"Bearer {self.valves.CLOUDFLARE_API_KEY}" - headers["Content-Type"] = "application/json" + # fix model name again, url messed up otherwise + model = model_id.replace("___", "/") - payload = {**body, "model": self.valves.CLOUDFLARE_MODEL} + payload = {**body, "model": model} if "user" in payload: del payload["user"] @@ -69,7 +101,7 @@ def pipe( r = requests.post( url=f"https://api.cloudflare.com/client/v4/accounts/{self.valves.CLOUDFLARE_ACCOUNT_ID}/ai/v1/chat/completions", json=payload, - headers=headers, + headers=self.headers, stream=True, )