Skip to content

Commit

Permalink
Feature: Store chat history in Cosmos DB (#2063)
Browse files Browse the repository at this point in the history
* Add a Cosmos DB resource for chat history

* Added Cosmos DB chat history feature to the backend

* Added Cosmos DB chat history feature to the frontend

* fix typo

* Reconfigure the additional API endpoints in a separate blueprint

* Refactor CosmosDB integration for chat history feature
Use the timestamp property instead of _ts

* Update docs

* Change cast to type annotation, add initial test

* Use authenticated decorators properly

* Add more unit tests for CosmosDB

* Remove unreachable code path, change 200 to 204 for delete, more tests

* Address mypy issues

* Update auth test with a session ID

* Delete volatile property from snapshot

---------

Co-authored-by: Pamela Fox <[email protected]>
  • Loading branch information
fujita-h and pamelafox authored Nov 12, 2024
1 parent 009d5e1 commit c3810e8
Show file tree
Hide file tree
Showing 30 changed files with 984 additions and 35 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ However, you can try the [Azure pricing calculator](https://azure.com/e/a87a169b
- Azure AI Document Intelligence: SO (Standard) tier using pre-built layout. Pricing per document page, sample documents have 261 pages total. [Pricing](https://azure.microsoft.com/pricing/details/form-recognizer/)
- Azure AI Search: Basic tier, 1 replica, free level of semantic search. Pricing per hour. [Pricing](https://azure.microsoft.com/pricing/details/search/)
- Azure Blob Storage: Standard tier with ZRS (Zone-redundant storage). Pricing per storage and read operations. [Pricing](https://azure.microsoft.com/pricing/details/storage/blobs/)
- Azure Cosmos DB: Serverless tier. Pricing per request unit and storage. [Pricing](https://azure.microsoft.com/pricing/details/cosmos-db/)
- Azure Monitor: Pay-as-you-go tier. Costs based on data ingested. [Pricing](https://azure.microsoft.com/pricing/details/monitor/)

To reduce costs, you can switch to free SKUs for various services, but those SKUs have limitations.
Expand Down
20 changes: 17 additions & 3 deletions app/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@
from approaches.chatreadretrievereadvision import ChatReadRetrieveReadVisionApproach
from approaches.retrievethenread import RetrieveThenReadApproach
from approaches.retrievethenreadvision import RetrieveThenReadVisionApproach
from chat_history.cosmosdb import chat_history_cosmosdb_bp
from config import (
CONFIG_ASK_APPROACH,
CONFIG_ASK_VISION_APPROACH,
CONFIG_AUTH_CLIENT,
CONFIG_BLOB_CONTAINER_CLIENT,
CONFIG_CHAT_APPROACH,
CONFIG_CHAT_HISTORY_BROWSER_ENABLED,
CONFIG_CHAT_HISTORY_COSMOS_ENABLED,
CONFIG_CHAT_VISION_APPROACH,
CONFIG_CREDENTIAL,
CONFIG_GPT4V_DEPLOYED,
Expand Down Expand Up @@ -224,7 +226,10 @@ async def chat(auth_claims: Dict[str, Any]):
# else creates a new session_id depending on the chat history options enabled.
session_state = request_json.get("session_state")
if session_state is None:
session_state = create_session_id(current_app.config[CONFIG_CHAT_HISTORY_BROWSER_ENABLED])
session_state = create_session_id(
current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED],
current_app.config[CONFIG_CHAT_HISTORY_BROWSER_ENABLED],
)
result = await approach.run(
request_json["messages"],
context=context,
Expand Down Expand Up @@ -255,7 +260,10 @@ async def chat_stream(auth_claims: Dict[str, Any]):
# else creates a new session_id depending on the chat history options enabled.
session_state = request_json.get("session_state")
if session_state is None:
session_state = create_session_id(current_app.config[CONFIG_CHAT_HISTORY_BROWSER_ENABLED])
session_state = create_session_id(
current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED],
current_app.config[CONFIG_CHAT_HISTORY_BROWSER_ENABLED],
)
result = await approach.run_stream(
request_json["messages"],
context=context,
Expand Down Expand Up @@ -289,6 +297,7 @@ def config():
"showSpeechOutputBrowser": current_app.config[CONFIG_SPEECH_OUTPUT_BROWSER_ENABLED],
"showSpeechOutputAzure": current_app.config[CONFIG_SPEECH_OUTPUT_AZURE_ENABLED],
"showChatHistoryBrowser": current_app.config[CONFIG_CHAT_HISTORY_BROWSER_ENABLED],
"showChatHistoryCosmos": current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED],
}
)

Expand Down Expand Up @@ -455,6 +464,7 @@ async def setup_clients():
USE_SPEECH_OUTPUT_BROWSER = os.getenv("USE_SPEECH_OUTPUT_BROWSER", "").lower() == "true"
USE_SPEECH_OUTPUT_AZURE = os.getenv("USE_SPEECH_OUTPUT_AZURE", "").lower() == "true"
USE_CHAT_HISTORY_BROWSER = os.getenv("USE_CHAT_HISTORY_BROWSER", "").lower() == "true"
USE_CHAT_HISTORY_COSMOS = os.getenv("USE_CHAT_HISTORY_COSMOS", "").lower() == "true"

# WEBSITE_HOSTNAME is always set by App Service, RUNNING_IN_PRODUCTION is set in main.bicep
RUNNING_ON_AZURE = os.getenv("WEBSITE_HOSTNAME") is not None or os.getenv("RUNNING_IN_PRODUCTION") is not None
Expand Down Expand Up @@ -484,6 +494,9 @@ async def setup_clients():
current_app.logger.info("Setting up Azure credential using AzureDeveloperCliCredential for home tenant")
azure_credential = AzureDeveloperCliCredential(process_timeout=60)

# Set the Azure credential in the app config for use in other parts of the app
current_app.config[CONFIG_CREDENTIAL] = azure_credential

# Set up clients for AI Search and Storage
search_client = SearchClient(
endpoint=f"https://{AZURE_SEARCH_SERVICE}.search.windows.net",
Expand Down Expand Up @@ -573,7 +586,6 @@ async def setup_clients():
current_app.config[CONFIG_SPEECH_SERVICE_VOICE] = AZURE_SPEECH_SERVICE_VOICE
# Wait until token is needed to fetch for the first time
current_app.config[CONFIG_SPEECH_SERVICE_TOKEN] = None
current_app.config[CONFIG_CREDENTIAL] = azure_credential

if OPENAI_HOST.startswith("azure"):
if OPENAI_HOST == "azure_custom":
Expand Down Expand Up @@ -628,6 +640,7 @@ async def setup_clients():
current_app.config[CONFIG_SPEECH_OUTPUT_BROWSER_ENABLED] = USE_SPEECH_OUTPUT_BROWSER
current_app.config[CONFIG_SPEECH_OUTPUT_AZURE_ENABLED] = USE_SPEECH_OUTPUT_AZURE
current_app.config[CONFIG_CHAT_HISTORY_BROWSER_ENABLED] = USE_CHAT_HISTORY_BROWSER
current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED] = USE_CHAT_HISTORY_COSMOS

# Various approaches to integrate GPT and external knowledge, most applications will use a single one of these patterns
# or some derivative, here we include several for exploration purposes
Expand Down Expand Up @@ -717,6 +730,7 @@ async def close_clients():
def create_app():
app = Quart(__name__)
app.register_blueprint(bp)
app.register_blueprint(chat_history_cosmosdb_bp)

if os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING"):
app.logger.info("APPLICATIONINSIGHTS_CONNECTION_STRING is set, enabling Azure Monitor")
Expand Down
Empty file.
192 changes: 192 additions & 0 deletions app/backend/chat_history/cosmosdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import os
import time
from typing import Any, Dict, Union

from azure.cosmos.aio import ContainerProxy, CosmosClient
from azure.identity.aio import AzureDeveloperCliCredential, ManagedIdentityCredential
from quart import Blueprint, current_app, jsonify, request

from config import (
CONFIG_CHAT_HISTORY_COSMOS_ENABLED,
CONFIG_COSMOS_HISTORY_CLIENT,
CONFIG_COSMOS_HISTORY_CONTAINER,
CONFIG_CREDENTIAL,
)
from decorators import authenticated
from error import error_response

chat_history_cosmosdb_bp = Blueprint("chat_history_cosmos", __name__, static_folder="static")


@chat_history_cosmosdb_bp.post("/chat_history")
@authenticated
async def post_chat_history(auth_claims: Dict[str, Any]):
if not current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED]:
return jsonify({"error": "Chat history not enabled"}), 400

container: ContainerProxy = current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER]
if not container:
return jsonify({"error": "Chat history not enabled"}), 400

entra_oid = auth_claims.get("oid")
if not entra_oid:
return jsonify({"error": "User OID not found"}), 401

try:
request_json = await request.get_json()
id = request_json.get("id")
answers = request_json.get("answers")
title = answers[0][0][:50] + "..." if len(answers[0][0]) > 50 else answers[0][0]
timestamp = int(time.time() * 1000)

await container.upsert_item(
{"id": id, "entra_oid": entra_oid, "title": title, "answers": answers, "timestamp": timestamp}
)

return jsonify({}), 201
except Exception as error:
return error_response(error, "/chat_history")


@chat_history_cosmosdb_bp.post("/chat_history/items")
@authenticated
async def get_chat_history(auth_claims: Dict[str, Any]):
if not current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED]:
return jsonify({"error": "Chat history not enabled"}), 400

container: ContainerProxy = current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER]
if not container:
return jsonify({"error": "Chat history not enabled"}), 400

entra_oid = auth_claims.get("oid")
if not entra_oid:
return jsonify({"error": "User OID not found"}), 401

try:
request_json = await request.get_json()
count = request_json.get("count", 20)
continuation_token = request_json.get("continuation_token")

res = container.query_items(
query="SELECT c.id, c.entra_oid, c.title, c.timestamp FROM c WHERE c.entra_oid = @entra_oid ORDER BY c.timestamp DESC",
parameters=[dict(name="@entra_oid", value=entra_oid)],
max_item_count=count,
)

# set the continuation token for the next page
pager = res.by_page(continuation_token)

# Get the first page, and the continuation token
try:
page = await pager.__anext__()
continuation_token = pager.continuation_token # type: ignore

items = []
async for item in page:
items.append(
{
"id": item.get("id"),
"entra_oid": item.get("entra_oid"),
"title": item.get("title", "untitled"),
"timestamp": item.get("timestamp"),
}
)

# If there are no more pages, StopAsyncIteration is raised
except StopAsyncIteration:
items = []
continuation_token = None

return jsonify({"items": items, "continuation_token": continuation_token}), 200

except Exception as error:
return error_response(error, "/chat_history/items")


@chat_history_cosmosdb_bp.get("/chat_history/items/<item_id>")
@authenticated
async def get_chat_history_session(auth_claims: Dict[str, Any], item_id: str):
if not current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED]:
return jsonify({"error": "Chat history not enabled"}), 400

container: ContainerProxy = current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER]
if not container:
return jsonify({"error": "Chat history not enabled"}), 400

entra_oid = auth_claims.get("oid")
if not entra_oid:
return jsonify({"error": "User OID not found"}), 401

try:
res = await container.read_item(item=item_id, partition_key=entra_oid)
return (
jsonify(
{
"id": res.get("id"),
"entra_oid": res.get("entra_oid"),
"title": res.get("title", "untitled"),
"timestamp": res.get("timestamp"),
"answers": res.get("answers", []),
}
),
200,
)
except Exception as error:
return error_response(error, f"/chat_history/items/{item_id}")


@chat_history_cosmosdb_bp.delete("/chat_history/items/<item_id>")
@authenticated
async def delete_chat_history_session(auth_claims: Dict[str, Any], item_id: str):
if not current_app.config[CONFIG_CHAT_HISTORY_COSMOS_ENABLED]:
return jsonify({"error": "Chat history not enabled"}), 400

container: ContainerProxy = current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER]
if not container:
return jsonify({"error": "Chat history not enabled"}), 400

entra_oid = auth_claims.get("oid")
if not entra_oid:
return jsonify({"error": "User OID not found"}), 401

try:
await container.delete_item(item=item_id, partition_key=entra_oid)
return jsonify({}), 204
except Exception as error:
return error_response(error, f"/chat_history/items/{item_id}")


@chat_history_cosmosdb_bp.before_app_serving
async def setup_clients():
USE_CHAT_HISTORY_COSMOS = os.getenv("USE_CHAT_HISTORY_COSMOS", "").lower() == "true"
AZURE_COSMOSDB_ACCOUNT = os.getenv("AZURE_COSMOSDB_ACCOUNT")
AZURE_CHAT_HISTORY_DATABASE = os.getenv("AZURE_CHAT_HISTORY_DATABASE")
AZURE_CHAT_HISTORY_CONTAINER = os.getenv("AZURE_CHAT_HISTORY_CONTAINER")

azure_credential: Union[AzureDeveloperCliCredential, ManagedIdentityCredential] = current_app.config[
CONFIG_CREDENTIAL
]

if USE_CHAT_HISTORY_COSMOS:
current_app.logger.info("USE_CHAT_HISTORY_COSMOS is true, setting up CosmosDB client")
if not AZURE_COSMOSDB_ACCOUNT:
raise ValueError("AZURE_COSMOSDB_ACCOUNT must be set when USE_CHAT_HISTORY_COSMOS is true")
if not AZURE_CHAT_HISTORY_DATABASE:
raise ValueError("AZURE_CHAT_HISTORY_DATABASE must be set when USE_CHAT_HISTORY_COSMOS is true")
if not AZURE_CHAT_HISTORY_CONTAINER:
raise ValueError("AZURE_CHAT_HISTORY_CONTAINER must be set when USE_CHAT_HISTORY_COSMOS is true")
cosmos_client = CosmosClient(
url=f"https://{AZURE_COSMOSDB_ACCOUNT}.documents.azure.com:443/", credential=azure_credential
)
cosmos_db = cosmos_client.get_database_client(AZURE_CHAT_HISTORY_DATABASE)
cosmos_container = cosmos_db.get_container_client(AZURE_CHAT_HISTORY_CONTAINER)

current_app.config[CONFIG_COSMOS_HISTORY_CLIENT] = cosmos_client
current_app.config[CONFIG_COSMOS_HISTORY_CONTAINER] = cosmos_container


@chat_history_cosmosdb_bp.after_app_serving
async def close_clients():
if current_app.config.get(CONFIG_COSMOS_HISTORY_CLIENT):
cosmos_client: CosmosClient = current_app.config[CONFIG_COSMOS_HISTORY_CLIENT]
await cosmos_client.close()
3 changes: 3 additions & 0 deletions app/backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@
CONFIG_SPEECH_SERVICE_TOKEN = "speech_service_token"
CONFIG_SPEECH_SERVICE_VOICE = "speech_service_voice"
CONFIG_CHAT_HISTORY_BROWSER_ENABLED = "chat_history_browser_enabled"
CONFIG_CHAT_HISTORY_COSMOS_ENABLED = "chat_history_cosmos_enabled"
CONFIG_COSMOS_HISTORY_CLIENT = "cosmos_history_client"
CONFIG_COSMOS_HISTORY_CONTAINER = "cosmos_history_container"
6 changes: 5 additions & 1 deletion app/backend/core/sessionhelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
from typing import Union


def create_session_id(config_chat_history_browser_enabled: bool) -> Union[str, None]:
def create_session_id(
config_chat_history_cosmos_enabled: bool, config_chat_history_browser_enabled: bool
) -> Union[str, None]:
if config_chat_history_cosmos_enabled:
return str(uuid.uuid4())
if config_chat_history_browser_enabled:
return str(uuid.uuid4())
return None
13 changes: 8 additions & 5 deletions app/backend/decorators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from functools import wraps
from typing import Any, Callable, Dict
from typing import Any, Callable, Dict, TypeVar, cast

from quart import abort, current_app, request

Expand Down Expand Up @@ -37,19 +37,22 @@ async def auth_handler(path=""):
return auth_handler


def authenticated(route_fn: Callable[[Dict[str, Any]], Any]):
_C = TypeVar("_C", bound=Callable[..., Any])


def authenticated(route_fn: _C) -> _C:
"""
Decorator for routes that might require access control. Unpacks Authorization header information into an auth_claims dictionary
"""

@wraps(route_fn)
async def auth_handler():
async def auth_handler(*args, **kwargs):
auth_helper = current_app.config[CONFIG_AUTH_CLIENT]
try:
auth_claims = await auth_helper.get_auth_claims_if_enabled(request.headers)
except AuthError:
abort(403)

return await route_fn(auth_claims)
return await route_fn(auth_claims, *args, **kwargs)

return auth_handler
return cast(_C, auth_handler)
1 change: 1 addition & 0 deletions app/backend/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ tiktoken
tenacity
azure-ai-documentintelligence
azure-cognitiveservices-speech
azure-cosmos
azure-search-documents==11.6.0b6
azure-storage-blob
azure-storage-file-datalake
Expand Down
4 changes: 4 additions & 0 deletions app/backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ azure-core==1.30.2
# via
# azure-ai-documentintelligence
# azure-core-tracing-opentelemetry
# azure-cosmos
# azure-identity
# azure-monitor-opentelemetry
# azure-monitor-opentelemetry-exporter
Expand All @@ -44,6 +45,8 @@ azure-core==1.30.2
# msrest
azure-core-tracing-opentelemetry==1.0.0b11
# via azure-monitor-opentelemetry
azure-cosmos==4.7.0
# via -r requirements.in
azure-identity==1.17.1
# via
# -r requirements.in
Expand Down Expand Up @@ -402,6 +405,7 @@ typing-extensions==4.12.2
# via
# azure-ai-documentintelligence
# azure-core
# azure-cosmos
# azure-identity
# azure-search-documents
# azure-storage-blob
Expand Down
Loading

0 comments on commit c3810e8

Please sign in to comment.