|
| 1 | +import enum |
| 2 | +from contextlib import asynccontextmanager |
| 3 | +from typing import Literal, TypedDict, Any, Optional |
| 4 | + |
| 5 | +from langchain_anthropic import ChatAnthropic |
| 6 | +from langgraph.graph import START, StateGraph, END |
| 7 | +from langgraph.prebuilt import create_react_agent |
| 8 | +from langgraph.types import Command |
| 9 | +from pydantic import BaseModel |
| 10 | +from uipath_langchain.chat.models import UiPathAzureChatOpenAI |
| 11 | +from langchain_mcp_adapters.client import MultiServerMCPClient |
| 12 | + |
| 13 | +from prompts import email_triage_prompt, refund_payment_agent_prompt, email_topic_extractor_prompt |
| 14 | +from local_tools import retrieve_from_execution_context_tool |
| 15 | +import os |
| 16 | + |
| 17 | + |
| 18 | +class EmailTopic(enum.Enum): |
| 19 | + """Email regarding a new payment""" |
| 20 | + PAYMENT = "PAYMENT" |
| 21 | + """Email regarding a payment refund""" |
| 22 | + REFUND = "REFUND" |
| 23 | + """Used for any email topic, other than payment or refund""" |
| 24 | + OTHER = "OTHER" |
| 25 | + |
| 26 | +class EmailTopicExtractorStructure(TypedDict): |
| 27 | + """The structure for email topic extraction response""" |
| 28 | + email_topic: str |
| 29 | + |
| 30 | +if os.getenv("USE_UIPATH_AI_UNITS") and os.getenv("USE_UIPATH_AI_UNITS") == "true": |
| 31 | + # other available UiPath chat models |
| 32 | + # "anthropic.claude-3-5-sonnet-20240620-v1:0", |
| 33 | + # "anthropic.claude-3-5-sonnet-20241022-v2:0", |
| 34 | + # "anthropic.claude-3-7-sonnet-20250219-v1:0", |
| 35 | + # "anthropic.claude-3-haiku-20240307-v1:0", |
| 36 | + # "gemini-1.5-pro-001", |
| 37 | + # "gemini-2.0-flash-001", |
| 38 | + # "gpt-4o-2024-05-13", |
| 39 | + # "gpt-4o-2024-08-06", |
| 40 | + # "gpt-4o-2024-11-20", |
| 41 | + # "gpt-4o-mini-2024-07-18", |
| 42 | + # "o3-mini-2025-01-31", |
| 43 | + llm = UiPathAzureChatOpenAI() |
| 44 | +else: |
| 45 | + llm = ChatAnthropic(model="claude-3-5-sonnet-latest") |
| 46 | + |
| 47 | +class OutputStructure(TypedDict): |
| 48 | + """LLM message after finishing execution""" |
| 49 | + message: str |
| 50 | + """Whether agent execution should continue""" |
| 51 | + should_continue: bool |
| 52 | + |
| 53 | +class GraphInput(BaseModel): |
| 54 | + email_address: str |
| 55 | + email_content: str |
| 56 | + |
| 57 | +class GraphOutput(BaseModel): |
| 58 | + answer: str |
| 59 | + |
| 60 | +class State(BaseModel): |
| 61 | + email_address: str |
| 62 | + email_content: str |
| 63 | + agent_message: str |
| 64 | + email_topic: Optional[EmailTopic] |
| 65 | + should_continue: bool |
| 66 | + |
| 67 | +def prepare_input(state: GraphInput): |
| 68 | + return State( |
| 69 | + email_address=state.email_address, |
| 70 | + email_content=state.email_content, |
| 71 | + should_continue=True, |
| 72 | + agent_message="", |
| 73 | + email_topic=None, |
| 74 | + ) |
| 75 | + |
| 76 | +@asynccontextmanager |
| 77 | +async def agent_mcp( |
| 78 | + server_slug: str, |
| 79 | + structured_output: Any = None, |
| 80 | + extra_tools: Any = None): |
| 81 | + async with MultiServerMCPClient() as client: |
| 82 | + await client.connect_to_server_via_sse( |
| 83 | + server_name="local-stripe-server", |
| 84 | + url=server_slug, |
| 85 | + headers={ |
| 86 | + "Authorization": f"Bearer {os.getenv('UIPATH_ACCESS_TOKEN')}" |
| 87 | + }, |
| 88 | + timeout=60, |
| 89 | + ) |
| 90 | + |
| 91 | + mcp_tools = client.get_tools() |
| 92 | + if extra_tools: |
| 93 | + available_tools = [*mcp_tools, *extra_tools] |
| 94 | + else: |
| 95 | + available_tools = [*mcp_tools] |
| 96 | + if structured_output is not None: |
| 97 | + agent = create_react_agent(llm, tools=available_tools, response_format=structured_output) |
| 98 | + else: |
| 99 | + agent = create_react_agent(llm, tools=available_tools) |
| 100 | + |
| 101 | + try: |
| 102 | + yield agent |
| 103 | + finally: |
| 104 | + pass |
| 105 | + |
| 106 | +async def understand_email(state: State) -> Command: |
| 107 | + result = await llm.with_structured_output(EmailTopicExtractorStructure).ainvoke( |
| 108 | + [("system", email_topic_extractor_prompt), |
| 109 | + ("user", "email content: " + state.email_content)] |
| 110 | + ) |
| 111 | + print(result) |
| 112 | + return Command( |
| 113 | + update={ |
| 114 | + "email_topic":result["email_topic"] |
| 115 | + } |
| 116 | + ) |
| 117 | + |
| 118 | +async def check_email(state: State) -> Command: |
| 119 | + async with (agent_mcp( |
| 120 | + os.getenv("UIPATH_MCP_INTERNAL_SERVER_URL"), |
| 121 | + structured_output = OutputStructure, |
| 122 | + extra_tools = [retrieve_from_execution_context_tool]) |
| 123 | + as agent): |
| 124 | + response = await agent.ainvoke( |
| 125 | + { |
| 126 | + "messages":[("system", email_triage_prompt), |
| 127 | + ("user", "email topic: " + str(state.email_topic.value)), |
| 128 | + ("user", "email address: " + state.email_address)] |
| 129 | + } |
| 130 | + ) |
| 131 | + # Extract the message from the agent's response |
| 132 | + output = response["structured_response"] |
| 133 | + print(output) |
| 134 | + return Command( |
| 135 | + update={ |
| 136 | + "agent_message": output["message"], |
| 137 | + "should_continue": output["should_continue"], |
| 138 | + } |
| 139 | + ) |
| 140 | + |
| 141 | +async def analyze_email_and_take_action(state: State) -> Command: |
| 142 | + async with agent_mcp(os.getenv("UIPATH_MCP_EXTERNAL_SERVER_URL")) as agent: |
| 143 | + response = await agent.ainvoke( |
| 144 | + { |
| 145 | + "messages":[("system", refund_payment_agent_prompt), |
| 146 | + ("user", "email content:" + state.email_content), |
| 147 | + ("user", "email address:" + state.email_address),] |
| 148 | + } |
| 149 | + ) |
| 150 | + return Command( |
| 151 | + update={ |
| 152 | + "agent_message": str(response["messages"][-1].content), |
| 153 | + } |
| 154 | + ) |
| 155 | +def collect_output(state: State) -> GraphOutput: |
| 156 | + return GraphOutput(answer=str(state.agent_message)) |
| 157 | + |
| 158 | +def decide_next_node_after_email_validation(state: State) -> Literal["analyze_email_and_take_action", "collect_output"]: |
| 159 | + if state.should_continue: |
| 160 | + return "analyze_email_and_take_action" |
| 161 | + return "collect_output" |
| 162 | + |
| 163 | +def decide_next_node_given_email_topic(state: State) -> Literal["collect_output", "check_email"]: |
| 164 | + if state.email_topic == EmailTopic.OTHER: |
| 165 | + return "collect_output" |
| 166 | + return "check_email" |
| 167 | + |
| 168 | +builder = StateGraph(State, input=GraphInput, output=GraphOutput) |
| 169 | +builder.add_node("prepare_input", prepare_input) |
| 170 | +builder.add_node("check_email", check_email) |
| 171 | +builder.add_node("collect_output", collect_output) |
| 172 | +builder.add_node("analyze_email_and_take_action", analyze_email_and_take_action) |
| 173 | +builder.add_node("understand_email", understand_email) |
| 174 | + |
| 175 | +builder.add_edge(START, "prepare_input") |
| 176 | +builder.add_edge("prepare_input", "understand_email") |
| 177 | +builder.add_conditional_edges("understand_email", decide_next_node_given_email_topic) |
| 178 | +builder.add_conditional_edges("check_email", decide_next_node_after_email_validation) |
| 179 | +builder.add_edge("analyze_email_and_take_action", "collect_output") |
| 180 | +builder.add_edge("collect_output", END) |
| 181 | + |
| 182 | + |
| 183 | +graph = builder.compile() |
0 commit comments