From 1551608cbaa77c3b71d9aba2028f0828c40d956e Mon Sep 17 00:00:00 2001 From: radu-mocanu Date: Mon, 19 May 2025 17:37:59 +0300 Subject: [PATCH 1/2] chore: first draft of financial-reconciliation-agent --- .../financial-reconciliation-agent/README.md | 0 .../agent_graph.py | 183 ++++++++++++++++++ .../financial-reconciliation-agent/input.json | 4 + .../langgraph.json | 7 + .../local_tools.py | 22 +++ .../mcp-internal/mcp.json | 9 + .../mcp-internal/pyproject.toml | 10 + .../mcp-internal/server.py | 170 ++++++++++++++++ .../mcp-internal/uipath.json | 11 ++ .../financial-reconciliation-agent/prompts.py | 39 ++++ .../pyproject.toml | 25 +++ 11 files changed, 480 insertions(+) create mode 100644 samples/financial-reconciliation-agent/README.md create mode 100644 samples/financial-reconciliation-agent/agent_graph.py create mode 100644 samples/financial-reconciliation-agent/input.json create mode 100644 samples/financial-reconciliation-agent/langgraph.json create mode 100644 samples/financial-reconciliation-agent/local_tools.py create mode 100644 samples/financial-reconciliation-agent/mcp-internal/mcp.json create mode 100644 samples/financial-reconciliation-agent/mcp-internal/pyproject.toml create mode 100644 samples/financial-reconciliation-agent/mcp-internal/server.py create mode 100644 samples/financial-reconciliation-agent/mcp-internal/uipath.json create mode 100644 samples/financial-reconciliation-agent/prompts.py create mode 100644 samples/financial-reconciliation-agent/pyproject.toml diff --git a/samples/financial-reconciliation-agent/README.md b/samples/financial-reconciliation-agent/README.md new file mode 100644 index 0000000..e69de29 diff --git a/samples/financial-reconciliation-agent/agent_graph.py b/samples/financial-reconciliation-agent/agent_graph.py new file mode 100644 index 0000000..0c209b0 --- /dev/null +++ b/samples/financial-reconciliation-agent/agent_graph.py @@ -0,0 +1,183 @@ +import enum +from contextlib import asynccontextmanager +from typing import Literal, TypedDict, Any, Optional + +from langchain_anthropic import ChatAnthropic +from langgraph.graph import START, StateGraph, END +from langgraph.prebuilt import create_react_agent +from langgraph.types import Command +from pydantic import BaseModel +from uipath_langchain.chat.models import UiPathAzureChatOpenAI +from langchain_mcp_adapters.client import MultiServerMCPClient + +from prompts import email_triage_prompt, refund_payment_agent_prompt, email_topic_extractor_prompt +from local_tools import retrieve_from_execution_context_tool +import os + + +class EmailTopic(enum.Enum): + """Email regarding a new payment""" + PAYMENT = "PAYMENT" + """Email regarding a payment refund""" + REFUND = "REFUND" + """Used for any email topic, other than payment or refund""" + OTHER = "OTHER" + +class EmailTopicExtractorStructure(TypedDict): + """The structure for email topic extraction response""" + email_topic: str + +if os.getenv("USE_UIPATH_AI_UNITS") and os.getenv("USE_UIPATH_AI_UNITS") == "true": + # other available UiPath chat models + # "anthropic.claude-3-5-sonnet-20240620-v1:0", + # "anthropic.claude-3-5-sonnet-20241022-v2:0", + # "anthropic.claude-3-7-sonnet-20250219-v1:0", + # "anthropic.claude-3-haiku-20240307-v1:0", + # "gemini-1.5-pro-001", + # "gemini-2.0-flash-001", + # "gpt-4o-2024-05-13", + # "gpt-4o-2024-08-06", + # "gpt-4o-2024-11-20", + # "gpt-4o-mini-2024-07-18", + # "o3-mini-2025-01-31", + llm = UiPathAzureChatOpenAI() +else: + llm = ChatAnthropic(model="claude-3-5-sonnet-latest") + +class OutputStructure(TypedDict): + """LLM message after finishing execution""" + message: str + """Whether agent execution should continue""" + should_continue: bool + +class GraphInput(BaseModel): + email_address: str + email_content: str + +class GraphOutput(BaseModel): + answer: str + +class State(BaseModel): + email_address: str + email_content: str + agent_message: str + email_topic: Optional[EmailTopic] + should_continue: bool + +def prepare_input(state: GraphInput): + return State( + email_address=state.email_address, + email_content=state.email_content, + should_continue=True, + agent_message="", + email_topic=None, + ) + +@asynccontextmanager +async def agent_mcp( + server_slug: str, + structured_output: Any = None, + extra_tools: Any = None): + async with MultiServerMCPClient() as client: + await client.connect_to_server_via_sse( + server_name="local-stripe-server", + url=server_slug, + headers={ + "Authorization": f"Bearer {os.getenv('UIPATH_ACCESS_TOKEN')}" + }, + timeout=60, + ) + + mcp_tools = client.get_tools() + if extra_tools: + available_tools = [*mcp_tools, *extra_tools] + else: + available_tools = [*mcp_tools] + if structured_output is not None: + agent = create_react_agent(llm, tools=available_tools, response_format=structured_output) + else: + agent = create_react_agent(llm, tools=available_tools) + + try: + yield agent + finally: + pass + +async def understand_email(state: State) -> Command: + result = await llm.with_structured_output(EmailTopicExtractorStructure).ainvoke( + [("system", email_topic_extractor_prompt), + ("user", "email content: " + state.email_content)] + ) + print(result) + return Command( + update={ + "email_topic":result["email_topic"] + } + ) + +async def check_email(state: State) -> Command: + async with (agent_mcp( + os.getenv("UIPATH_MCP_INTERNAL_SERVER_URL"), + structured_output = OutputStructure, + extra_tools = [retrieve_from_execution_context_tool]) + as agent): + response = await agent.ainvoke( + { + "messages":[("system", email_triage_prompt), + ("user", "email topic: " + str(state.email_topic.value)), + ("user", "email address: " + state.email_address)] + } + ) + # Extract the message from the agent's response + output = response["structured_response"] + print(output) + return Command( + update={ + "agent_message": output["message"], + "should_continue": output["should_continue"], + } + ) + +async def analyze_email_and_take_action(state: State) -> Command: + async with agent_mcp(os.getenv("UIPATH_MCP_EXTERNAL_SERVER_URL")) as agent: + response = await agent.ainvoke( + { + "messages":[("system", refund_payment_agent_prompt), + ("user", "email content:" + state.email_content), + ("user", "email address:" + state.email_address),] + } + ) + return Command( + update={ + "agent_message": str(response["messages"][-1].content), + } + ) +def collect_output(state: State) -> GraphOutput: + return GraphOutput(answer=str(state.agent_message)) + +def decide_next_node_after_email_validation(state: State) -> Literal["analyze_email_and_take_action", "collect_output"]: + if state.should_continue: + return "analyze_email_and_take_action" + return "collect_output" + +def decide_next_node_given_email_topic(state: State) -> Literal["collect_output", "check_email"]: + if state.email_topic == EmailTopic.OTHER: + return "collect_output" + return "check_email" + +builder = StateGraph(State, input=GraphInput, output=GraphOutput) +builder.add_node("prepare_input", prepare_input) +builder.add_node("check_email", check_email) +builder.add_node("collect_output", collect_output) +builder.add_node("analyze_email_and_take_action", analyze_email_and_take_action) +builder.add_node("understand_email", understand_email) + +builder.add_edge(START, "prepare_input") +builder.add_edge("prepare_input", "understand_email") +builder.add_conditional_edges("understand_email", decide_next_node_given_email_topic) +builder.add_conditional_edges("check_email", decide_next_node_after_email_validation) +builder.add_edge("analyze_email_and_take_action", "collect_output") +builder.add_edge("collect_output", END) + + +graph = builder.compile() diff --git a/samples/financial-reconciliation-agent/input.json b/samples/financial-reconciliation-agent/input.json new file mode 100644 index 0000000..47bef67 --- /dev/null +++ b/samples/financial-reconciliation-agent/input.json @@ -0,0 +1,4 @@ +{ + "email_address": "john.doe@gmail.com", + "email_content": "I want a refund for my latest transaction of cat food" +} \ No newline at end of file diff --git a/samples/financial-reconciliation-agent/langgraph.json b/samples/financial-reconciliation-agent/langgraph.json new file mode 100644 index 0000000..d6a42ac --- /dev/null +++ b/samples/financial-reconciliation-agent/langgraph.json @@ -0,0 +1,7 @@ +{ + "dependencies": ["."], + "graphs": { + "agent": "./agent_graph.py:graph" + }, + "env": ".env" +} diff --git a/samples/financial-reconciliation-agent/local_tools.py b/samples/financial-reconciliation-agent/local_tools.py new file mode 100644 index 0000000..a494e4b --- /dev/null +++ b/samples/financial-reconciliation-agent/local_tools.py @@ -0,0 +1,22 @@ +from langchain_core.tools import Tool + +def retrieve_from_execution_context(key: str) -> str: + import os + if os.getenv(key) is None: + return 'not found' + else: + return os.getenv(key) + + +retrieve_from_execution_context_tool = Tool.from_function( + func=retrieve_from_execution_context, + name="retrieve_from_execution_context", + description=""" Retrieve an execution context detail + + Args: + key (str): The key of the element to return + + Returns: + str: The value of the element in the execution context if found, else 'not found' + """, +) \ No newline at end of file diff --git a/samples/financial-reconciliation-agent/mcp-internal/mcp.json b/samples/financial-reconciliation-agent/mcp-internal/mcp.json new file mode 100644 index 0000000..d1c8f0a --- /dev/null +++ b/samples/financial-reconciliation-agent/mcp-internal/mcp.json @@ -0,0 +1,9 @@ +{ + "servers": { + "local-stripe-server": { + "transport": "stdio", + "command": "python", + "args": ["server.py"] + } + } +} \ No newline at end of file diff --git a/samples/financial-reconciliation-agent/mcp-internal/pyproject.toml b/samples/financial-reconciliation-agent/mcp-internal/pyproject.toml new file mode 100644 index 0000000..3165166 --- /dev/null +++ b/samples/financial-reconciliation-agent/mcp-internal/pyproject.toml @@ -0,0 +1,10 @@ +[project] +name = "local-financial-reconciliation-mcp-server" +version = "0.0.1" +description = "Local MCP server for stripe agent" +authors = [{ name = "Radu Mocanu" }] +dependencies = [ + "uipath-mcp>=0.0.84", + "uipath==2.0.56" +] +requires-python = ">=3.10" diff --git a/samples/financial-reconciliation-agent/mcp-internal/server.py b/samples/financial-reconciliation-agent/mcp-internal/server.py new file mode 100644 index 0000000..8a245df --- /dev/null +++ b/samples/financial-reconciliation-agent/mcp-internal/server.py @@ -0,0 +1,170 @@ +import os +from typing import Literal + +from mcp.server.fastmcp import FastMCP +from uipath import UiPath +import unirest + +mcp = FastMCP(name="Email checker MCP") +csv_file_name = "processed_or_blacklisted_emails-{0}.csv" +secret = os.getenv('UIPATH_SECRET') +check_email_api_key = None + + +def validate_with_check_mail_api(email: str, api_key: str) -> bool: + import requests + import json + + url = "https://mailcheck.p.rapidapi.com/" + + querystring = {"disable_test_connection": "true", "domain": email} + + headers = { + 'x-rapidapi-host': "mailcheck.p.rapidapi.com", + 'x-rapidapi-key': api_key + } + + response = requests.request("GET", url, headers=headers, params=querystring) + return json.loads(response.text)['block'] + + +def is_temporary_email(email: str) -> bool: + temp_email_domains = [ + "temp-mail.org", "temp.mail", "disposablemail.com", "tempmail.com", + "guerrillamail.com", "sharklasers.com", "grr.la", "guerrillamail.info", + "yopmail.com", "10minutemail.com", "mailinator.com", "throwawaymail.com" + ] + + domain = email.split('@')[-1].lower() + + return any(temp_domain in domain for temp_domain in temp_email_domains) + + +@mcp.tool() +async def is_valid_email(email_address: str) -> bool: + """Verify if email is temporary or not.""" + if check_email_api_key: + return not validate_with_check_mail_api(email_address, check_email_api_key) + return not is_temporary_email(email_address) + + +@mcp.tool() +async def mark_email(email_address: str, base_url: str, status: Literal["refund", "payment"], folder_key: str) -> bool: + """Mark an email as processed. + + Args: + email_address (str): The email address to be marked as processed. Example: "user@example.com" + base_url (str): The base URL of the UiPath cloud service. Example: "https://cloud.uipath.com" (UIPATH_URL) + status (Literal[str]): The email status. Example: "processed" + folder_key (str): The folder key of the UiPath cloud service. (UIPATH_FOLDER_KEY) + + Returns: + bool: True if the email was successfully marked as processed, False if there was an error + """ + import csv + from datetime import datetime + from io import StringIO + import os + os.environ['UIPATH_FOLDER_KEY'] = folder_key + + uipath = UiPath(secret=secret, base_url=base_url) + + try: + current_date = datetime.now().strftime('%Y-%m-%d') + local_file_path = f"./{csv_file_name.format(current_date)}" + + csv_content = [['email', 'status']] + + try: + uipath.buckets.download( + name="stripe", + blob_file_path=csv_file_name.format(current_date), + destination_path=local_file_path + ) + + if os.path.exists(local_file_path): + with open(local_file_path, 'r', newline='') as f: + reader = csv.reader(f) + csv_content = list(reader) + + except Exception: + # file might not exist + pass + + csv_content.append([email_address, status]) + + csv_buffer = StringIO() + writer = csv.writer(csv_buffer) + writer.writerows(csv_content) + + final_csv_content = csv_buffer.getvalue() + csv_buffer.close() + + uipath.buckets.upload_from_memory( + name="stripe", + content=final_csv_content, + content_type="text/csv", + blob_file_path=csv_file_name.format(current_date) + ) + + if os.path.exists(local_file_path): + os.remove(local_file_path) + + return True + + except Exception as e: + print(f"Error processing email: {str(e)}") + return False + + +@mcp.tool() +async def check_same_day_refund(email_address: str, base_url: str, folder_key: str) -> bool: + """Check if an email has been previously marked as processed. + + Args: + email_address (str): The email address to check. Example: "user@example.com" + base_url (str): The base URL of the UiPath cloud service. Example: "https://cloud.uipath.com" (UIPATH_URL) + folder_key (str): The folder key of the UiPath cloud service. (UIPATH_FOLDER_KEY) + + Returns: + bool: True if there was already a refund for the email in the same day, False otherwise. + """ + import csv + from datetime import datetime + import os + + os.environ['UIPATH_FOLDER_KEY'] = folder_key + uipath = UiPath(secret=secret, base_url=base_url) + + current_date = datetime.now().strftime('%Y-%m-%d') + local_file_path = f"./{csv_file_name.format(current_date)}" + + try: + uipath.buckets.download( + name="stripe", + blob_file_path=csv_file_name.format(current_date), + destination_path=local_file_path + ) + except Exception: + # File might not exist for today + return False + + result = False + if os.path.exists(local_file_path): + try: + with open(local_file_path, 'r', newline='') as f: + reader = csv.reader(f) + next(reader) # Skip header + for row in reader: + if row[0] == email_address and row[1] == 'refund': + result = True + break + finally: + os.remove(local_file_path) + return result + + return result + + +if __name__ == "__main__": + mcp.run() diff --git a/samples/financial-reconciliation-agent/mcp-internal/uipath.json b/samples/financial-reconciliation-agent/mcp-internal/uipath.json new file mode 100644 index 0000000..11b3858 --- /dev/null +++ b/samples/financial-reconciliation-agent/mcp-internal/uipath.json @@ -0,0 +1,11 @@ +{ + "entryPoints": [ + { + "filePath": "local-stripe-server", + "uniqueId": "0467ec94-9b1b-49e9-a003-3513a0ed8ff2", + "type": "mcpserver", + "input": {}, + "output": {} + } + ] +} \ No newline at end of file diff --git a/samples/financial-reconciliation-agent/prompts.py b/samples/financial-reconciliation-agent/prompts.py new file mode 100644 index 0000000..b7649bd --- /dev/null +++ b/samples/financial-reconciliation-agent/prompts.py @@ -0,0 +1,39 @@ +refund_payment_agent_prompt=""" +You are an advanced AI system designed to manage financial transactions based on inbound customer requests. +You have 2 duties: +1. Processing refund requests +2. Creating payment links + +1. PROCESSING REFUND REQUESTS +Check that the email of the customer requesting the refund is the same as the one that did the payment. + +2. CREATING PAYMENT LINKS +check if the product exits on the stripe catalog, then generate a payment link with the requested quantity (default to 1 if not quantity is specified). +lastly, if the client does not exist in customer list, create it. + +Conclude the process by generating a detailed step-by-step log of all the actions you performed. + +IMPORTANT +If you cannot complete the request, due to lack of information, create an email requesting the missing data. + +""" + +email_triage_prompt = """ +As a proficient assistant, your primary task is to examine the provided email address to ensure it isn't a temporary one. +If you verify it's temporary, discontinue the execution promptly (it should not continue further). +However, if it's not temporary mark the email accordingly. + +IMPORTANT +There can be a maximum of 1 refund per day for a given user email. + +You might need access to execution context details. Base url should be under key UIPATH_URL and folder key under UIPATH_FOLDER_KEY. +Conclude the process by generating a detailed step-by-step log of all the actions you performed.""" + +email_topic_extractor_prompt = """ +You are a professional email summarizer. +Decide what the email is about. +Your task is to read the email, decipher its purpose, and appropriately slot it in one of these well-defined categories: + 1. PAYMENT (i.e. someone wants to buy something) + 2. REFUND (i.e. someone wants to return something) + 3. OTHER +""" diff --git a/samples/financial-reconciliation-agent/pyproject.toml b/samples/financial-reconciliation-agent/pyproject.toml new file mode 100644 index 0000000..7f9abbd --- /dev/null +++ b/samples/financial-reconciliation-agent/pyproject.toml @@ -0,0 +1,25 @@ +[project] +name = "financial-reconciliation-agent" +version = "0.0.4" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.11" +authors = [{ name = "Radu Mocanu" }] +dependencies = [ + "langgraph>=0.2.70", + "langchain-core>=0.3.34", + "langchain-anthropic>=0.3.9", + "langgraph-checkpoint-sqlite>=2.0.3", + "python-dotenv>=1.0.1", + "uipath-langchain==0.0.110", + "pydantic>=2.10.6", + "aiohttp>=3.11.12", + "typing-extensions>=4.12.2", + "langchain-mcp-adapters>=0.0.5", + "ipython>=8.32.0", + "mcp>=1.4.1", + "langchain_mcp_adapters==0.0.9" +] + +[tool.setuptools] +py-modules = [] From d203e6df67306e06bac56b6d67dbcba4a841c454 Mon Sep 17 00:00:00 2001 From: radu-mocanu Date: Wed, 21 May 2025 10:35:33 +0300 Subject: [PATCH 2/2] chore: first draft of financial-reconciliation-agent --- .../agent_graph.py | 20 +++++++++---------- .../mcp-internal/server.py | 10 +++++----- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/samples/financial-reconciliation-agent/agent_graph.py b/samples/financial-reconciliation-agent/agent_graph.py index 0c209b0..484c727 100644 --- a/samples/financial-reconciliation-agent/agent_graph.py +++ b/samples/financial-reconciliation-agent/agent_graph.py @@ -115,7 +115,7 @@ async def understand_email(state: State) -> Command: } ) -async def check_email(state: State) -> Command: +async def triage_email(state: State) -> Command: async with (agent_mcp( os.getenv("UIPATH_MCP_INTERNAL_SERVER_URL"), structured_output = OutputStructure, @@ -138,7 +138,7 @@ async def check_email(state: State) -> Command: } ) -async def analyze_email_and_take_action(state: State) -> Command: +async def handle_transaction(state: State) -> Command: async with agent_mcp(os.getenv("UIPATH_MCP_EXTERNAL_SERVER_URL")) as agent: response = await agent.ainvoke( { @@ -155,28 +155,28 @@ async def analyze_email_and_take_action(state: State) -> Command: def collect_output(state: State) -> GraphOutput: return GraphOutput(answer=str(state.agent_message)) -def decide_next_node_after_email_validation(state: State) -> Literal["analyze_email_and_take_action", "collect_output"]: +def decide_next_node_after_email_validation(state: State) -> Literal["handle_transaction", "collect_output"]: if state.should_continue: - return "analyze_email_and_take_action" + return "handle_transaction" return "collect_output" -def decide_next_node_given_email_topic(state: State) -> Literal["collect_output", "check_email"]: +def decide_next_node_given_email_topic(state: State) -> Literal["collect_output", "triage_email"]: if state.email_topic == EmailTopic.OTHER: return "collect_output" - return "check_email" + return "triage_email" builder = StateGraph(State, input=GraphInput, output=GraphOutput) builder.add_node("prepare_input", prepare_input) -builder.add_node("check_email", check_email) +builder.add_node("triage_email", triage_email) builder.add_node("collect_output", collect_output) -builder.add_node("analyze_email_and_take_action", analyze_email_and_take_action) +builder.add_node("handle_transaction", handle_transaction) builder.add_node("understand_email", understand_email) builder.add_edge(START, "prepare_input") builder.add_edge("prepare_input", "understand_email") builder.add_conditional_edges("understand_email", decide_next_node_given_email_topic) -builder.add_conditional_edges("check_email", decide_next_node_after_email_validation) -builder.add_edge("analyze_email_and_take_action", "collect_output") +builder.add_conditional_edges("triage_email", decide_next_node_after_email_validation) +builder.add_edge("handle_transaction", "collect_output") builder.add_edge("collect_output", END) diff --git a/samples/financial-reconciliation-agent/mcp-internal/server.py b/samples/financial-reconciliation-agent/mcp-internal/server.py index 8a245df..1f9b6d2 100644 --- a/samples/financial-reconciliation-agent/mcp-internal/server.py +++ b/samples/financial-reconciliation-agent/mcp-internal/server.py @@ -3,12 +3,9 @@ from mcp.server.fastmcp import FastMCP from uipath import UiPath -import unirest mcp = FastMCP(name="Email checker MCP") csv_file_name = "processed_or_blacklisted_emails-{0}.csv" -secret = os.getenv('UIPATH_SECRET') -check_email_api_key = None def validate_with_check_mail_api(email: str, api_key: str) -> bool: @@ -43,8 +40,11 @@ def is_temporary_email(email: str) -> bool: @mcp.tool() async def is_valid_email(email_address: str) -> bool: """Verify if email is temporary or not.""" - if check_email_api_key: - return not validate_with_check_mail_api(email_address, check_email_api_key) + if check_email_api_key is not None: + try: + return not validate_with_check_mail_api(email_address, check_email_api_key) + except Exception: + pass return not is_temporary_email(email_address)