-
Notifications
You must be signed in to change notification settings - Fork 5.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
initial implementation for vectordb #3879
base: main
Are you sure you want to change the base?
Changes from 16 commits
7da80e9
f4f3c3f
347fb60
4b886b0
b9b72d6
ae4d8ae
9fdc30c
1d59e51
299a2eb
2354a49
1a147cb
ab7fd07
5745849
4391961
b3f0672
0eba50a
d427e0b
5116a52
fc56024
0b952d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from ._chromadb import AsyncChromaVectorDB, ChromaVectorDB | ||
from ._factory import VectorDBFactory | ||
|
||
__all__ = ["ChromaVectorDB", "AsyncChromaVectorDB", "VectorDBFactory"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,369 @@ | ||
from typing import ( | ||
Any, | ||
Callable, | ||
List, | ||
Mapping, | ||
Optional, | ||
Protocol, | ||
Sequence, | ||
Tuple, | ||
Union, | ||
runtime_checkable, | ||
) | ||
|
||
from pydantic import BaseModel | ||
|
||
Metadata = Union[Mapping[str, Any], None] | ||
Vector = Union[Sequence[float], Sequence[int]] | ||
ItemID = Union[str, int] | ||
|
||
|
||
class Document(BaseModel): | ||
"""Define Document according to autogen 0.4 specifications.""" | ||
|
||
id: ItemID | ||
content: Optional[str] = None | ||
metadata: Optional[Metadata] = None | ||
embedding: Optional[Vector] = None | ||
|
||
model_config = {"arbitrary_types_allowed": True} | ||
|
||
|
||
"""QueryResults is the response from the vector database for a query/queries. | ||
A query is a list containing one string while queries is a list containing multiple strings. | ||
The response is a list of query results, each query result is a list of tuples containing the document and the distance. | ||
""" | ||
QueryResults = List[List[Tuple[Document, float]]] | ||
|
||
|
||
@runtime_checkable | ||
class AsyncVectorDB(Protocol): | ||
""" | ||
Abstract class for async vector database. A vector database is responsible for storing and retrieving documents. | ||
|
||
Attributes: | ||
active_collection: Any | The active collection in the vector database. Make get_collection faster. Default is None. | ||
type: str | The type of the vector database, chroma, pgvector, etc. Default is "". | ||
|
||
Methods: | ||
create_collection: Callable[[str, bool, bool], Awaitable[Any]] | Create a collection in the vector database. | ||
get_collection: Callable[[str], Awaitable[Any]] | Get the collection from the vector database. | ||
delete_collection: Callable[[str], Awaitable[Any]] | Delete the collection from the vector database. | ||
insert_docs: Callable[[List[Document], str, bool], Awaitable[None]] | Insert documents into the collection of the vector database. | ||
update_docs: Callable[[List[Document], str], Awaitable[None]] | Update documents in the collection of the vector database. | ||
delete_docs: Callable[[List[ItemID], str], Awaitable[None]] | Delete documents from the collection of the vector database. | ||
retrieve_docs: Callable[[List[str], str, int, float], Awaitable[QueryResults]] | Retrieve documents from the collection of the vector database based on the queries. | ||
get_docs_by_ids: Callable[[List[ItemID], str], Awaitable[List[Document]]] | Retrieve documents from the collection of the vector database based on the ids. | ||
""" | ||
|
||
active_collection: Any = None | ||
type: str = "" | ||
embedding_function: Optional[Callable[..., Any]] = None # embeddings = embedding_function(sentences) | ||
|
||
async def create_collection(self, collection_name: str, overwrite: bool = False, get_or_create: bool = True) -> Any: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. some vector store requires schema to create collection and some other parameters, may using variable args here can help generalise There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is a great callout. Checked the old code and those additional arguments were being passed in through the class constructor. Not the best design. Pushing a change as soon as the checks complete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be updated now |
||
""" | ||
Create a collection in the vector database. | ||
Case 1. if the collection does not exist, create the collection. | ||
Case 2. the collection exists, if overwrite is True, it will overwrite the collection. | ||
Case 3. the collection exists and overwrite is False, if get_or_create is True, it will get the collection, | ||
otherwise it raise a ValueError. | ||
|
||
Args: | ||
collection_name: str | The name of the collection. | ||
overwrite: bool | Whether to overwrite the collection if it exists. Default is False. | ||
get_or_create: bool | Whether to get the collection if it exists. Default is True. | ||
|
||
Returns: | ||
Any | The collection object. | ||
""" | ||
... | ||
|
||
async def get_collection(self, collection_name: Optional[str] = None) -> Any: | ||
""" | ||
Get the collection from the vector database. | ||
|
||
Args: | ||
collection_name: Optional[str] | The name of the collection. Default is None. | ||
If None, return the current active collection. | ||
|
||
Returns: | ||
Any | The collection object. | ||
""" | ||
... | ||
|
||
async def delete_collection(self, collection_name: str) -> Any: | ||
""" | ||
Delete the collection from the vector database. | ||
|
||
Args: | ||
collection_name: str | The name of the collection. | ||
|
||
Returns: | ||
Any | ||
""" | ||
... | ||
|
||
async def insert_docs( | ||
self, | ||
docs: List[Document], | ||
collection_name: Optional[str] = None, | ||
upsert: bool = False, | ||
**kwargs: Any, | ||
) -> None: | ||
""" | ||
Insert documents into the collection of the vector database. | ||
|
||
Args: | ||
docs: List[Document] | A list of documents. Each document is a Pydantic Document model. | ||
collection_name: Optional[str] | The name of the collection. Default is None. | ||
upsert: bool | Whether to update the document if it exists. Default is False. | ||
kwargs: Dict[str, Any] | Additional keyword arguments. | ||
|
||
Returns: | ||
None | ||
""" | ||
... | ||
|
||
async def update_docs(self, docs: List[Document], collection_name: Optional[str] = None, **kwargs: Any) -> None: | ||
""" | ||
Update documents in the collection of the vector database. | ||
|
||
Args: | ||
docs: List[Document] | A list of documents. | ||
collection_name: Optional[str] | The name of the collection. Default is None. | ||
kwargs: Dict[str, Any] | Additional keyword arguments. | ||
|
||
Returns: | ||
None | ||
""" | ||
... | ||
|
||
async def delete_docs(self, ids: List[ItemID], collection_name: Optional[str] = None, **kwargs: Any) -> None: | ||
""" | ||
Delete documents from the collection of the vector database. | ||
|
||
Args: | ||
ids: List[ItemID] | A list of document ids. Each id is a typed `ItemID`. | ||
collection_name: Optional[str] | The name of the collection. Default is None. | ||
kwargs: Dict[str, Any] | Additional keyword arguments. | ||
|
||
Returns: | ||
None | ||
""" | ||
... | ||
|
||
async def retrieve_docs( | ||
self, | ||
queries: List[str], | ||
collection_name: Optional[str] = None, | ||
n_results: int = 10, | ||
distance_threshold: float = -1, | ||
**kwargs: Any, | ||
) -> QueryResults: | ||
""" | ||
Retrieve documents from the collection of the vector database based on the queries. | ||
|
||
Args: | ||
queries: List[str] | A list of queries. Each query is a string. | ||
collection_name: Optional[str] | The name of the collection. Default is None. | ||
n_results: int | The number of relevant documents to return. Default is 10. | ||
distance_threshold: float | The threshold for the distance score, only distance smaller than it will be | ||
returned. Don't filter with it if < 0. Default is -1. | ||
kwargs: Dict[str, Any] | Additional keyword arguments. | ||
|
||
Returns: | ||
QueryResults | The query results. Each query result is a list of list of tuples containing the document and | ||
the distance. | ||
""" | ||
... | ||
|
||
async def get_docs_by_ids( | ||
self, | ||
ids: Optional[List[ItemID]] = None, | ||
collection_name: Optional[str] = None, | ||
include: Optional[List[str]] = None, | ||
**kwargs: Any, | ||
) -> List[Document]: | ||
""" | ||
Retrieve documents from the collection of the vector database based on the ids. | ||
|
||
Args: | ||
ids: Optional[List[ItemID]] | A list of document ids. If None, will return all the documents. Default is None. | ||
collection_name: Optional[str] | The name of the collection. Default is None. | ||
include: Optional[List[str]] | The fields to include. Default is None. | ||
If None, will include ["metadatas", "documents"], ids will always be included. This may differ | ||
depending on the implementation. | ||
kwargs: Dict[str, Any] | Additional keyword arguments. | ||
|
||
Returns: | ||
List[Document] | The results. | ||
""" | ||
... | ||
|
||
|
||
@runtime_checkable | ||
class VectorDB(Protocol): | ||
""" | ||
Abstract class for synchronous vector database. A vector database is responsible for storing and retrieving documents. | ||
For async support, use AsyncVectorDB instead. | ||
|
||
Attributes: | ||
active_collection: Any | The active collection in the vector database. Make get_collection faster. Default is None. | ||
type: str | The type of the vector database, chroma, pgvector, etc. Default is "". | ||
|
||
Methods: | ||
create_collection: Callable[[str, bool, bool], Any] | Create a collection in the vector database. | ||
get_collection: Callable[[str], Any] | Get the collection from the vector database. | ||
delete_collection: Callable[[str], Any] | Delete the collection from the vector database. | ||
insert_docs: Callable[[List[Document], str, bool], None] | Insert documents into the collection of the vector database. | ||
update_docs: Callable[[List[Document], str], None] | Update documents in the collection of the vector database. | ||
delete_docs: Callable[[List[ItemID], str], None] | Delete documents from the collection of the vector database. | ||
retrieve_docs: Callable[[List[str], str, int, float], QueryResults] | Retrieve documents from the collection of the vector database based on the queries. | ||
get_docs_by_ids: Callable[[List[ItemID], str], List[Document]] | Retrieve documents from the collection of the vector database based on the ids. | ||
""" | ||
|
||
active_collection: Any = None | ||
type: str = "" | ||
embedding_function: Optional[Callable[[List[str]], List[List[float]]]] = ( | ||
None # embeddings = embedding_function(sentences) | ||
) | ||
|
||
def create_collection(self, collection_name: str, overwrite: bool = False, get_or_create: bool = True) -> Any: | ||
""" | ||
Create a collection in the vector database. | ||
Case 1. if the collection does not exist, create the collection. | ||
Case 2. the collection exists, if overwrite is True, it will overwrite the collection. | ||
Case 3. the collection exists and overwrite is False, if get_or_create is True, it will get the collection, | ||
otherwise it raise a ValueError. | ||
|
||
Args: | ||
collection_name: str | The name of the collection. | ||
overwrite: bool | Whether to overwrite the collection if it exists. Default is False. | ||
get_or_create: bool | Whether to get the collection if it exists. Default is True. | ||
|
||
Returns: | ||
Any | The collection object. | ||
""" | ||
... | ||
|
||
def get_collection(self, collection_name: Optional[str] = None) -> Any: | ||
""" | ||
Get the collection from the vector database. | ||
|
||
Args: | ||
collection_name: Optional[str] | The name of the collection. Default is None. | ||
If None, return the current active collection. | ||
|
||
Returns: | ||
Any | The collection object. | ||
""" | ||
... | ||
|
||
def delete_collection(self, collection_name: str) -> Any: | ||
""" | ||
Delete the collection from the vector database. | ||
|
||
Args: | ||
collection_name: str | The name of the collection. | ||
|
||
Returns: | ||
Any | ||
""" | ||
... | ||
|
||
def insert_docs( | ||
self, | ||
docs: List[Document], | ||
collection_name: Optional[str] = None, | ||
upsert: bool = False, | ||
**kwargs: Any, | ||
) -> None: | ||
""" | ||
Insert documents into the collection of the vector database. | ||
|
||
Args: | ||
docs: List[Document] | A list of documents. Each document is a Pydantic Document model. | ||
collection_name: Optional[str] | The name of the collection. Default is None. | ||
upsert: bool | Whether to update the document if it exists. Default is False. | ||
kwargs: Dict[str, Any] | Additional keyword arguments. | ||
|
||
Returns: | ||
None | ||
""" | ||
... | ||
|
||
def update_docs(self, docs: List[Document], collection_name: Optional[str] = None, **kwargs: Any) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the most used method is upsert There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the protocol, my understanding is that the idea is to keep it closer in design the DB theory and enforce CRUD like operations. Implementations can add upsert but they probably shouldn't be forced to |
||
""" | ||
Update documents in the collection of the vector database. | ||
|
||
Args: | ||
docs: List[Document] | A list of documents. | ||
collection_name: Optional[str] | The name of the collection. Default is None. | ||
kwargs: Dict[str, Any] | Additional keyword arguments. | ||
|
||
Returns: | ||
None | ||
""" | ||
... | ||
|
||
def delete_docs(self, ids: List[ItemID], collection_name: Optional[str] = None, **kwargs: Any) -> None: | ||
""" | ||
Delete documents from the collection of the vector database. | ||
|
||
Args: | ||
ids: List[ItemID] | A list of document ids. Each id is a typed `ItemID`. | ||
collection_name: Optional[str] | The name of the collection. Default is None. | ||
kwargs: Dict[str, Any] | Additional keyword arguments. | ||
|
||
Returns: | ||
None | ||
""" | ||
... | ||
|
||
def retrieve_docs( | ||
self, | ||
queries: List[str], | ||
collection_name: Optional[str] = None, | ||
n_results: int = 10, | ||
distance_threshold: float = -1, | ||
**kwargs: Any, | ||
) -> QueryResults: | ||
""" | ||
Retrieve documents from the collection of the vector database based on the queries. | ||
|
||
Args: | ||
queries: List[str] | A list of queries. Each query is a string. | ||
collection_name: Optional[str] | The name of the collection. Default is None. | ||
n_results: int | The number of relevant documents to return. Default is 10. | ||
distance_threshold: float | The threshold for the distance score, only distance smaller than it will be | ||
returned. Don't filter with it if < 0. Default is -1. | ||
kwargs: Dict[str, Any] | Additional keyword arguments. | ||
|
||
Returns: | ||
QueryResults | The query results. Each query result is a list of list of tuples containing the document and | ||
the distance. | ||
""" | ||
... | ||
|
||
def get_docs_by_ids( | ||
self, | ||
ids: Optional[List[ItemID]] = None, | ||
collection_name: Optional[str] = None, | ||
include: Optional[List[str]] = None, | ||
**kwargs: Any, | ||
) -> List[Document]: | ||
""" | ||
Retrieve documents from the collection of the vector database based on the ids. | ||
|
||
Args: | ||
ids: Optional[List[ItemID]] | A list of document ids. If None, will return all the documents. Default is None. | ||
collection_name: Optional[str] | The name of the collection. Default is None. | ||
include: Optional[List[str]] | The fields to include. Default is None. | ||
If None, will include ["metadatas", "documents"], ids will always be included. This may differ | ||
depending on the implementation. | ||
kwargs: Dict[str, Any] | Additional keyword arguments. | ||
|
||
Returns: | ||
List[Document] | The results. | ||
""" | ||
... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why assuming string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same representation used in 0.2 and I believe it assumes the content has already been preprocessed to a string format. More complex data types can lead to more complex operations in general. Would you suggest something different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the vector store can have a payload object, which is usually a json. Unless you know the schema mapping returning string is a problem, maybe we can use dictionary or some other model?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is meant to capture the full payload but only the main text content of the associated document and the transformation between autogen doc and vectordb doc is handled in each implementation. The pydantic model is defined with
arbitrary_types_allowed
which should allow implementations to unpack additional attributes as needed at the document level instead of the content. Would that work for the scenario you have in mind?