Skip to content

Commit

Permalink
fix(frontend|backend) - adjust conversation apis to have improved end…
Browse files Browse the repository at this point in the history
…point url /conversation/{session_id}/...

  -> History = GET /conversations/{session_id}/messages
  -> Send Message = POST /conversations/{session_id}/messages
  -> Experiences = /conversations/{session_id}/experiences
- move conversation api to its own module
- adjust frontend service to call on updated endpoints
- add tests to conversation routes.py

feat(backend) - add reaction endpoints
 -> Upsert reaction = PUT /conversations/{session_id}/messages/{message_id}/reactions
 -> Delete reaction = DELETE /conversations/{session_id}/messages/{message_id}/reactions

feat(backend) - add api feature to block adding reaction to user message

chore(frontend): - implement reactions button component
 - add reaction service
 - add reaction reason popover

chore(backend) - integrate frontend/backend
fix(backend) - change /reaction to /reactions

chore(backend) - prettier format [pulumi up]

chore(backend) - move validation tests to test_model
- add checks to verify user is accessing reactions on messages in their own session

chore: adjust alignment and font size for message reactions
- fix state issues on popover close
- expect 201 on add reaction in storybook

fix(frontend): rebase footers refactor

fix(backend): move user message check logic to conversation_memory_manager

chore(backend) - add migration script that adds message_id to all the existing messages

fix

chore(backend) - review
  • Loading branch information
rav3n11 authored and ApostolosBenisis committed Jan 24, 2025
1 parent 3dd7e40 commit 6e05069
Show file tree
Hide file tree
Showing 102 changed files with 5,059 additions and 806 deletions.
9 changes: 6 additions & 3 deletions backend/app/agent/agent_director/abstract_agent_director.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from app.agent.agent_types import AgentInput, AgentOutput
from app.conversation_memory.conversation_memory_manager import \
ConversationMemoryManager
ConversationMemoryManager, IConversationMemoryManager


class ConversationPhase(Enum):
Expand Down Expand Up @@ -63,7 +63,10 @@ def deserialize_conversation_conducted_at(cls, value: Optional[datetime]) -> Opt
def from_document(_doc: Mapping[str, Any]) -> "AgentDirectorState":
return AgentDirectorState(session_id=_doc["session_id"],
current_phase=_doc["current_phase"],
conversation_conducted_at=_doc["conversation_conducted_at"])
# The conversation_conducted_at field was introduced later, so it may not exist in all documents
# For the documents that don't have this field, we'll default to None,
# The implication being that the client will have to handle this case.
conversation_conducted_at=_doc.get("conversation_conducted_at", None))


def _parse_data(value: Optional[datetime | str]) -> Optional[datetime]:
Expand All @@ -90,7 +93,7 @@ class AbstractAgentDirector(ABC):
It maintains the state of the conversation which is divided into phases.
"""

def __init__(self, conversation_manager: ConversationMemoryManager):
def __init__(self, conversation_manager: IConversationMemoryManager):
# Initialize the logger
self._logger = logging.getLogger(self.__class__.__name__)

Expand Down
6 changes: 6 additions & 0 deletions backend/app/agent/agent_types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from enum import Enum
from typing import Optional
from datetime import datetime, timezone
from bson import ObjectId

from pydantic import BaseModel, Field, field_validator, field_serializer

Expand All @@ -22,6 +23,8 @@ class AgentInput(BaseModel):
"""
The input to an agent
"""
message_id: Optional[str] = Field(default_factory=lambda: str(ObjectId()))
"""A unique id for the message"""

message: str # Bad idea, rename
"""The message from the user"""
Expand Down Expand Up @@ -76,6 +79,9 @@ class AgentOutput(BaseModel):
"""
The output of an agent
"""
message_id: Optional[str] = Field(default_factory=lambda: str(ObjectId()))
"""A unique id for the message"""

message_for_user: str
"""The message for the user"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Config:
def from_document(_doc: Mapping[str, Any]) -> "SkillsExplorerAgentState":
return SkillsExplorerAgentState(session_id=_doc["session_id"],
first_time_for_experience=_doc["first_time_for_experience"],
experiences_explored=_doc["experiences_explored"] )
experiences_explored=_doc["experiences_explored"])


class SkillsExplorerAgent(Agent):
Expand Down
62 changes: 50 additions & 12 deletions backend/app/application_state.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from abc import ABC, abstractmethod
from pydantic import BaseModel
from typing import AsyncIterator

from app.agent.agent_director.abstract_agent_director import AgentDirectorState
from app.agent.collect_experiences_agent import CollectExperiencesAgentState
Expand Down Expand Up @@ -75,8 +76,53 @@ async def delete_state(self, session_id: int) -> None:
"""
raise NotImplementedError

@abstractmethod
async def get_all_session_ids(self) -> AsyncIterator[int]:
"""
Get all session ID
"""
raise NotImplementedError


class IApplicationStateManager(ABC):
"""
Interface for the application state manager
Allows to mock the class in tests
"""

@abstractmethod
async def get_state(self, session_id: int) -> ApplicationState:
"""
Get the application state for a session
If the state does not exist, a new state is created and stored in
the store prior to returning it.
"""
raise NotImplementedError()

@abstractmethod
async def save_state(self, state: ApplicationState):
"""
Save the application state for a session
"""
raise NotImplementedError()

class ApplicationStateManager:
@abstractmethod
async def delete_state(self, session_id: int) -> None:
"""
Delete the application state for a session
"""
raise NotImplementedError()

@abstractmethod
async def get_all_session_ids(self) -> AsyncIterator[int]:
"""
Get all application states
"""
raise NotImplementedError()


class ApplicationStateManager(IApplicationStateManager):
"""
The application state manager is responsible for managing the application state.
it delegates the storage and retrieval of the state to an application state store.
Expand All @@ -87,11 +133,6 @@ def __init__(self, store: ApplicationStateStore):
self.logger = logging.getLogger(self.__class__.__name__)

async def get_state(self, session_id: int) -> ApplicationState:
"""
Get the application state for a session
If the state does not exist, a new state is created and stored in
the store prior to returning it.
"""
state = await self._store.get_state(session_id)
if state is None:
state = ApplicationState(
Expand All @@ -107,13 +148,10 @@ async def get_state(self, session_id: int) -> ApplicationState:
return state

async def save_state(self, state: ApplicationState):
"""
Save the application state for a session
"""
return await self._store.save_state(state)

async def delete_state(self, session_id: int) -> None:
"""
Delete the application state for a session
"""
return await self._store.delete_state(session_id)

async def get_all_session_ids(self) -> AsyncIterator[int]:
raise NotImplementedError("Not implemented yet")
87 changes: 63 additions & 24 deletions backend/app/conversation_memory/conversation_memory_manager.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,58 @@
import logging

from abc import ABC, abstractmethod

from app.agent.agent_types import AgentInput, AgentOutput
from app.conversation_memory.conversation_memory_types import ConversationHistory, \
ConversationContext, ConversationTurn, ConversationMemoryManagerState
from app.conversation_memory.summarizer import Summarizer


class ConversationMemoryManager:
class IConversationMemoryManager(ABC):
"""
Interface for the conversation memory manager
Allows to mock the class in tests
"""

@abstractmethod
def set_state(self, state: ConversationMemoryManagerState):
"""
Set the conversation memory manager state
:param state: the manager state
"""
raise NotImplementedError()

@abstractmethod
async def get_conversation_context(self):
"""
Get the conversation context for a session that has been summarized as needed and should be passed to an agent.
:return: The conversation context
"""
raise NotImplementedError()

@abstractmethod
async def update_history(self, user_input: AgentInput, agent_output: AgentOutput):
"""
Update the conversation history for a session by appending the user input and agent output to the history.
Additionally, the history will be summarized if the to be summarized history window is full
:param user_input: The user input
:param agent_output: The agent output
"""
raise NotImplementedError()

@abstractmethod
async def is_user_message(self, message_id: str) -> bool:
"""
Utility method that checks if a message with a certain message_id comes from the user or not
:param message_id: the id of the message to check
:return - bool: True if the message is a User message, False if it is a Compass message
:raises ValueError: if the message_id is not found in the conversation history
"""


class ConversationMemoryManager(IConversationMemoryManager):
"""
Manages the conversation history
"""
Expand All @@ -19,29 +65,18 @@ def __init__(self, unsummarized_window_size, to_be_summarized_window_size):
self._logger = logging.getLogger(self.__class__.__name__)

def set_state(self, state: ConversationMemoryManagerState):
"""
Set the conversation memory manager state
:param state: the manager state
"""
self._state = state

async def get_conversation_context(self) -> ConversationContext:
"""
Get the conversation context for a session that has been summarized as needed and should be passed to an agent.
:return: The conversation context
"""
return ConversationContext(
all_history=self._state.all_history,
history=ConversationHistory(
turns=(self._state.to_be_summarized_history.turns + self._state.unsummarized_history.turns)
),
summary=self._state.summary
all_history=self._state.all_history,
history=ConversationHistory(
turns=(self._state.to_be_summarized_history.turns + self._state.unsummarized_history.turns)
),
summary=self._state.summary
)

async def _summarize(self):
"""
Update the conversation summary to include the given history input
"""
try:
# Generate the new summary
self._logger.debug("Summarizing conversation:")
Expand All @@ -56,13 +91,6 @@ async def _summarize(self):
self._logger.error("Error summarizing conversation: %s", e, exc_info=True)

async def update_history(self, user_input: AgentInput, agent_output: AgentOutput) -> None:
"""
Update the conversation history for a session by appending the user input and agent output to the history.
Additionally, the history will be summarized if the to be summarized history window is full
:param user_input: The user input
:param agent_output: The agent output
"""
count = len(self._state.all_history.turns) + 1
turn = ConversationTurn(index=count, input=user_input, output=agent_output)
self._state.all_history.turns.append(turn)
Expand All @@ -75,3 +103,14 @@ async def update_history(self, user_input: AgentInput, agent_output: AgentOutput
# If the to_be_summarized_history window is full, we perform summarization
if len(self._state.to_be_summarized_history.turns) == self._to_be_summarized_window_size:
await self._summarize()

async def is_user_message(self, message_id: str) -> bool:
# find out if the message_id of the message to react to is found an input message
# if it is in the turns as an input, that means it was a user message
for turn in self._state.all_history.turns:
if turn.input.message_id == message_id:
return True
if turn.output.message_id == message_id:
return False

raise ValueError(f"Message with id {message_id} not found in conversation history")
File renamed without changes.
7 changes: 7 additions & 0 deletions backend/app/conversations/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
Constants related to conversations.
"""

MAX_MESSAGE_LENGTH = 1000

UNEXPECTED_FAILURE_MESSAGE = "Oops! something went wrong"
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from fastapi import FastAPI, APIRouter

from app.poc.poc_routes import add_poc_route_endpoints
from app.conversations.poc.poc_routes import add_poc_route_endpoints
from app.users.auth import Authentication


Expand Down
File renamed without changes.
Empty file.
97 changes: 97 additions & 0 deletions backend/app/conversations/reactions/repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""
This module contains the repository layer for handling reactions.
"""
import logging
from abc import ABC, abstractmethod
from typing import Optional, List

from motor.motor_asyncio import AsyncIOMotorDatabase

from app.server_dependencies.database_collections import Collections
from app.conversations.reactions.types import Reaction


class IReactionRepository(ABC):
"""
Interface for the reaction repository.
Allows to mock the repository in tests
"""

@abstractmethod
async def add(self, reaction: Reaction) -> Optional[str]:
"""
Creates or updates a reaction.
:param reaction: ReactionModel - the reaction to create or update
:return: str - the id of the created/updated reaction document id
"""
raise NotImplementedError()

@abstractmethod
async def delete(self, session_id: int, message_id: str):
"""
Deletes a reaction.
:param session_id: the id of the session containing the message
:param message_id: the id of the message that was reacted to
"""
raise NotImplementedError()

@abstractmethod
async def get_reactions(self, session_id: int) -> Optional[List[Reaction]]:
"""
Gets the full list of reactions for the given session
:param session_id: the id of the session containing the messages
:return: Optional[List[Reaction]] - the reactions if found, otherwise None
"""
raise NotImplementedError()


class ReactionRepository(IReactionRepository):
def __init__(self, db: AsyncIOMotorDatabase):
self._db = db
self._logger = logging.getLogger(ReactionRepository.__name__)
self._collection = db.get_collection(Collections.REACTIONS)

async def add(self, reaction: Reaction) -> Optional[str]:
# Convert the pydantic model to a dictionary
payload = reaction.model_dump()

# Use upsert to either create a new document or update an existing one
# We use session_id and message_id as the unique key
result = await self._collection.update_one(
{
"session_id": {"$eq": reaction.session_id},
"message_id": {"$eq": reaction.message_id}
},
{"$set": payload},
upsert=True
)

# Return the upserted ID if it was an insert, otherwise None
return str(result.upserted_id) if result.upserted_id else None

async def delete(self, session_id: int, message_id: str):
await self._collection.delete_one({
"session_id": {"$eq": session_id},
"message_id": {"$eq": message_id}
})

async def get_reactions(self, session_id: int) -> Optional[List[Reaction]]:
"""
Gets the full list of reactions for a session
:param session_id: the id of the session containing the message
:return: Optional[List[Reaction]] - the list of reactions if found, otherwise None
"""
cursor = self._collection.find({
"session_id": {"$eq": session_id},
})

reactions = []
async for doc in cursor:
reactions.append(Reaction.from_dict(doc))

return reactions if reactions else None
Loading

0 comments on commit 6e05069

Please sign in to comment.