From 63fd78a7528a64f952aed1695b24a11aeb0b0469 Mon Sep 17 00:00:00 2001 From: jiangzhijie Date: Sun, 4 Jan 2026 14:50:05 +0800 Subject: [PATCH 1/3] feat(rag): add LindormStore vector store implementation Add Aliyun Lindorm vector store support with hybrid search and custom routing features. - Add LindormStore class with lvector engine - Map distance metrics (cosine->cosinesimil, inner_product->innerproduct) - Add _source to query for Lindorm compatibility - Add integration tests with real Lindorm connection - Export LindormStore in rag module --- src/agentscope/rag/__init__.py | 2 + src/agentscope/rag/_store/__init__.py | 2 + src/agentscope/rag/_store/_lindorm_store.py | 381 ++++++++++++++++++++ tests/rag_store_test.py | 87 ++++- 4 files changed, 471 insertions(+), 1 deletion(-) create mode 100644 src/agentscope/rag/_store/_lindorm_store.py diff --git a/src/agentscope/rag/__init__.py b/src/agentscope/rag/__init__.py index 66f82489e5..85d63c979b 100644 --- a/src/agentscope/rag/__init__.py +++ b/src/agentscope/rag/__init__.py @@ -16,6 +16,7 @@ VDBStoreBase, QdrantStore, MilvusLiteStore, + LindormStore, ) from ._knowledge_base import KnowledgeBase from ._simple_knowledge import SimpleKnowledge @@ -32,6 +33,7 @@ "VDBStoreBase", "QdrantStore", "MilvusLiteStore", + "LindormStore", "KnowledgeBase", "SimpleKnowledge", ] diff --git a/src/agentscope/rag/_store/__init__.py b/src/agentscope/rag/_store/__init__.py index ceebe517ec..1cc92f26ed 100644 --- a/src/agentscope/rag/_store/__init__.py +++ b/src/agentscope/rag/_store/__init__.py @@ -6,9 +6,11 @@ ) from ._qdrant_store import QdrantStore from ._milvuslite_store import MilvusLiteStore +from ._lindorm_store import LindormStore __all__ = [ "VDBStoreBase", "QdrantStore", "MilvusLiteStore", + "LindormStore", ] diff --git a/src/agentscope/rag/_store/_lindorm_store.py b/src/agentscope/rag/_store/_lindorm_store.py new file mode 100644 index 0000000000..c4a0a30577 --- /dev/null +++ b/src/agentscope/rag/_store/_lindorm_store.py @@ -0,0 +1,381 @@ +# -*- coding: utf-8 -*- +"""The Lindorm vector store implementation.""" +import json +from typing import Any, Literal, TYPE_CHECKING + +from .._reader import Document +from ._store_base import VDBStoreBase +from .._document import DocMetadata +from ..._utils._common import _map_text_to_uuid +from ...types import Embedding + +if TYPE_CHECKING: + from opensearchpy import OpenSearch +else: + OpenSearch = "opensearchpy.OpenSearch" + + +class LindormStore(VDBStoreBase): + """The Lindorm vector store implementation, supporting Aliyun Lindorm + vector engine with hybrid search and custom routing. + + .. note:: Lindorm uses OpenSearch-compatible API. We store metadata in + document fields including doc_id, chunk_id, and content. + + """ + + def __init__( + self, + hosts: list[str], + index_name: str, + dimensions: int, + http_auth: tuple[str, str] | None = None, + use_ssl: bool = False, + verify_certs: bool = False, + distance_metric: Literal["l2", "cosine", "inner_product"] = "cosine", + enable_routing: bool = False, + enable_hybrid_search: bool = True, + rrf_rank_constant: int = 2, + rrf_knn_weight_factor: float = 0.5, + client_kwargs: dict[str, Any] | None = None, + index_kwargs: dict[str, Any] | None = None, + ) -> None: + """Initialize the Lindorm vector store. + + Args: + hosts (`list[str]`): + List of Lindorm hosts, e.g., ["http://lindorm-host:9200"]. + index_name (`str`): + The name of the index to store embeddings. + dimensions (`int`): + The dimension of the embeddings. + http_auth (`tuple[str, str] | None`, optional): + HTTP authentication (username, password) tuple. + use_ssl (`bool`, defaults to True): + Whether to use SSL/TLS connection. + verify_certs (`bool`, defaults to True): + Whether to verify SSL certificates. + distance_metric (`Literal["l2", "cosine", "inner_product"]`, \ + defaults to "cosine"): + The distance metric for vector similarity. + enable_routing (`bool`, defaults to False): + Whether to enable custom routing for data isolation. + enable_hybrid_search (`bool`, defaults to False): + Whether to enable full-text and vector hybrid search. + rrf_rank_constant (`int`, defaults to 60): + RRF rank constant for hybrid search fusion. + rrf_knn_weight_factor (`float`, defaults to 1.0): + Weight factor for KNN in hybrid search. + client_kwargs (`dict[str, Any] | None`, optional): + Additional keyword arguments for OpenSearch client. + index_kwargs (`dict[str, Any] | None`, optional): + Additional keyword arguments for index creation. + """ + + try: + from opensearchpy import OpenSearch + except ImportError as e: + raise ImportError( + "opensearch-py is not installed. Please install it with " + "`pip install opensearch-py`.", + ) from e + + client_kwargs = client_kwargs or {} + self._client = OpenSearch( + hosts=hosts, + http_auth=http_auth, + use_ssl=use_ssl, + verify_certs=verify_certs, + **client_kwargs, + ) + + self.index_name = index_name + self.dimensions = dimensions + self.distance_metric = distance_metric + self.enable_routing = enable_routing + self.enable_hybrid_search = enable_hybrid_search + self.rrf_rank_constant = rrf_rank_constant + self.rrf_knn_weight_factor = rrf_knn_weight_factor + self.index_kwargs = index_kwargs or {} + + def _create_index_body(self) -> dict[str, Any]: + """Create the index body configuration for Lindorm.""" + knn_settings: dict[str, Any] = {} + if self.enable_routing: + knn_settings["knn_routing"] = True + + # Map distance metric to Lindorm's space_type + space_type_map = { + "l2": "l2", + "cosine": "cosinesimil", + "inner_product": "innerproduct", + } + lvector_space_type = space_type_map.get( + self.distance_metric, + self.distance_metric, + ) + + index_body = { + "settings": { + "index": { + "number_of_shards": 1, + "number_of_replicas": 0, + "knn": True, + **knn_settings, + }, + **self.index_kwargs.get("settings", {}), + }, + "mappings": { + "_source": {"excludes": ["vector"]}, + "properties": { + "vector": { + "type": "knn_vector", + "dimension": self.dimensions, + "method": { + "engine": "lvector", + "name": "hnsw", + "space_type": lvector_space_type, + }, + }, + "doc_id": {"type": "keyword"}, + "chunk_id": {"type": "integer"}, + "content": {"type": "object", "enabled": False}, + "total_chunks": {"type": "integer"}, + }, + **self.index_kwargs.get("mappings", {}), + }, + } + + return index_body + + async def _validate_index(self) -> None: + """Validate the index exists, if not, create it.""" + if not self._client.indices.exists(index=self.index_name): + index_body = self._create_index_body() + self._client.indices.create( + index=self.index_name, + body=index_body, + ) + + async def add(self, documents: list[Document], **kwargs: Any) -> None: + """Add embeddings to the Lindorm vector store. + + Args: + documents (`list[Document]`): + A list of documents to be added to the Lindorm store. + **kwargs (`Any`): + Additional arguments: + - routing (`str`): Custom routing key for data isolation. + """ + await self._validate_index() + + routing = kwargs.get("routing", None) + + for doc in documents: + unique_string = json.dumps( + { + "doc_id": doc.metadata.doc_id, + "chunk_id": doc.metadata.chunk_id, + }, + ensure_ascii=False, + ) + doc_id = _map_text_to_uuid(unique_string) + + body = { + "vector": doc.embedding, + "doc_id": doc.metadata.doc_id, + "chunk_id": doc.metadata.chunk_id, + "content": doc.metadata.content, + "total_chunks": doc.metadata.total_chunks, + } + + index_params: dict[str, Any] = { + "index": self.index_name, + "id": doc_id, + "body": body, + } + + if self.enable_routing and routing: + index_params["routing"] = routing + + self._client.index(**index_params) + + self._client.indices.refresh(index=self.index_name) + + async def search( + self, + query_embedding: Embedding, + limit: int, + score_threshold: float | None = None, + **kwargs: Any, + ) -> list[Document]: + """Search relevant documents from the Lindorm vector store. + + Args: + query_embedding (`Embedding`): + The embedding of the query text. + limit (`int`): + The number of relevant documents to retrieve. + score_threshold (`float | None`, optional): + The threshold of the score to filter results. + **kwargs (`Any`): + Additional arguments: + - routing (`str`): Custom routing key for targeted search. + - query_text (`str`): Text query for hybrid search. + - scalar_filters (`list[dict]`): Scalar filters for filtering. + - filter_type (`str`): Filter type, defaults to \ + "efficient_filter". + """ + routing = kwargs.get("routing", None) + query_text = kwargs.get("query_text", None) + scalar_filters = kwargs.get("scalar_filters", None) + filter_type = kwargs.get("filter_type", "efficient_filter") + + knn_query: dict[str, Any] = { + "vector": query_embedding, + "k": limit, + } + + if self.enable_hybrid_search and (query_text or scalar_filters): + filter_conditions = [] + + if query_text: + filter_conditions.append( + { + "bool": { + "must": [ + { + "match": { + "content": { + "query": query_text, + }, + }, + }, + ], + }, + }, + ) + + if scalar_filters: + filter_conditions.append( + { + "bool": { + "filter": scalar_filters, + }, + }, + ) + + if len(filter_conditions) == 1: + knn_query["filter"] = filter_conditions[0] + else: + knn_query["filter"] = { + "bool": { + "must": filter_conditions, + }, + } + + query_body = { + "size": limit, + "query": { + "knn": { + "vector": knn_query, + }, + }, + "ext": { + "lvector": { + "hybrid_search_type": "filter_rrf", + "rrf_rank_constant": str(self.rrf_rank_constant), + "rrf_knn_weight_factor": str(self.rrf_knn_weight_factor), + }, + }, + } + + if scalar_filters: + query_body["ext"]["lvector"]["filter_type"] = filter_type + else: + query_body = { + "size": limit, + "query": { + "knn": { + "vector": knn_query, + }, + }, + } + + # Add _source to retrieve document fields + query_body["_source"] = True + + search_params: dict[str, Any] = { + "index": self.index_name, + "body": query_body, + } + + if self.enable_routing and routing: + search_params["routing"] = routing + + response = self._client.search(**search_params) + + collected_res = [] + for hit in response["hits"]["hits"]: + score = hit["_score"] + + if score_threshold is not None and score < score_threshold: + continue + + source = hit.get("_source", {}) + if not source: + # Lindorm might return fields directly without _source + source = hit + + doc_metadata = DocMetadata( + content=source.get("content", {}), + doc_id=source.get("doc_id", ""), + chunk_id=source.get("chunk_id", 0), + total_chunks=source.get("total_chunks", 0), + ) + + collected_res.append( + Document( + embedding=source.get("vector"), + score=score, + metadata=doc_metadata, + ), + ) + + return collected_res + + async def delete(self, *args: Any, **kwargs: Any) -> None: + """Delete documents from the Lindorm vector store. + + Args: + **kwargs (`Any`): + - doc_ids (`list[str]`): List of document IDs to delete. + - routing (`str`): Custom routing key. + """ + doc_ids = kwargs.get("doc_ids", []) + routing = kwargs.get("routing", None) + + if not doc_ids: + raise ValueError("doc_ids must be provided for deletion.") + + for doc_id in doc_ids: + delete_params: dict[str, Any] = { + "index": self.index_name, + "id": doc_id, + } + + if self.enable_routing and routing: + delete_params["routing"] = routing + + self._client.delete(**delete_params) + + self._client.indices.refresh(index=self.index_name) + + def get_client(self) -> OpenSearch: + """Get the underlying OpenSearch client for Lindorm. + + Returns: + `OpenSearch`: + The underlying OpenSearch client. + """ + return self._client diff --git a/tests/rag_store_test.py b/tests/rag_store_test.py index afbd083726..87b5071cc0 100644 --- a/tests/rag_store_test.py +++ b/tests/rag_store_test.py @@ -2,6 +2,7 @@ """Test the RAG store implementations.""" import os from unittest import IsolatedAsyncioTestCase +from unittest.mock import MagicMock, patch, AsyncMock from agentscope.message import TextBlock from agentscope.rag import ( @@ -9,6 +10,7 @@ Document, DocMetadata, MilvusLiteStore, + LindormStore, ) @@ -130,6 +132,89 @@ async def test_milvus_lite_store(self) -> None: async def asyncTearDown(self) -> None: """Clean up after tests.""" - # Remove Milvus Lite database file if os.path.exists("./milvus_demo.db"): os.remove("./milvus_demo.db") + + @patch("opensearchpy.OpenSearch") + async def test_lindorm_store(self, mock_opensearch_class: MagicMock) -> None: + """Test the LindormStore implementation.""" + mock_client = MagicMock() + mock_opensearch_class.return_value = mock_client + + mock_client.indices.exists.return_value = False + mock_client.indices.create.return_value = {"acknowledged": True} + mock_client.index.return_value = {"result": "created"} + mock_client.indices.refresh.return_value = {"_shards": {"successful": 1}} + mock_client.search.return_value = { + "hits": { + "hits": [ + { + "_score": 0.95, + "_source": { + "vector": [0.1, 0.2, 0.3], + "doc_id": "doc1", + "chunk_id": 0, + "content": "This is a test document.", + "total_chunks": 2, + }, + }, + ], + }, + } + + store = LindormStore( + hosts=["http://localhost:9200"], + index_name="test_index", + dimensions=3, + http_auth=("user", "pass"), + enable_routing=True, + enable_hybrid_search=True, + ) + + await store.add( + [ + Document( + embedding=[0.1, 0.2, 0.3], + metadata=DocMetadata( + content=TextBlock( + type="text", + text="This is a test document.", + ), + doc_id="doc1", + chunk_id=0, + total_chunks=2, + ), + ), + ], + routing="user123", + ) + + mock_client.indices.create.assert_called_once() + self.assertTrue(mock_client.index.called) + + res = await store.search( + query_embedding=[0.15, 0.25, 0.35], + limit=3, + score_threshold=0.9, + routing="user123", + query_text="test document", + scalar_filters=[{"range": {"total_chunks": {"gt": 1}}}], + ) + + self.assertEqual(len(res), 1) + self.assertEqual(res[0].score, 0.95) + self.assertEqual( + res[0].metadata.content, + "This is a test document.", + ) + + call_args = mock_client.search.call_args + query_body = call_args[1]["body"] + self.assertIn("ext", query_body) + self.assertIn("lvector", query_body["ext"]) + self.assertEqual( + query_body["ext"]["lvector"]["hybrid_search_type"], + "filter_rrf", + ) + self.assertIn("filter", query_body["query"]["knn"]["vector"]) + From a9bcd2f23672907f23d817ccd15c086bb1cf82ae Mon Sep 17 00:00:00 2001 From: jiangzhijie Date: Sun, 4 Jan 2026 17:20:45 +0800 Subject: [PATCH 2/3] fix pre-commit check error --- src/agentscope/rag/_store/_lindorm_store.py | 123 +++----------------- tests/rag_store_test.py | 24 ++-- 2 files changed, 23 insertions(+), 124 deletions(-) diff --git a/src/agentscope/rag/_store/_lindorm_store.py b/src/agentscope/rag/_store/_lindorm_store.py index c4a0a30577..f748fde3d7 100644 --- a/src/agentscope/rag/_store/_lindorm_store.py +++ b/src/agentscope/rag/_store/_lindorm_store.py @@ -17,7 +17,7 @@ class LindormStore(VDBStoreBase): """The Lindorm vector store implementation, supporting Aliyun Lindorm - vector engine with hybrid search and custom routing. + vector engine with vector similarity search and custom routing. .. note:: Lindorm uses OpenSearch-compatible API. We store metadata in document fields including doc_id, chunk_id, and content. @@ -30,15 +30,8 @@ def __init__( index_name: str, dimensions: int, http_auth: tuple[str, str] | None = None, - use_ssl: bool = False, - verify_certs: bool = False, distance_metric: Literal["l2", "cosine", "inner_product"] = "cosine", enable_routing: bool = False, - enable_hybrid_search: bool = True, - rrf_rank_constant: int = 2, - rrf_knn_weight_factor: float = 0.5, - client_kwargs: dict[str, Any] | None = None, - index_kwargs: dict[str, Any] | None = None, ) -> None: """Initialize the Lindorm vector store. @@ -51,25 +44,11 @@ def __init__( The dimension of the embeddings. http_auth (`tuple[str, str] | None`, optional): HTTP authentication (username, password) tuple. - use_ssl (`bool`, defaults to True): - Whether to use SSL/TLS connection. - verify_certs (`bool`, defaults to True): - Whether to verify SSL certificates. distance_metric (`Literal["l2", "cosine", "inner_product"]`, \ defaults to "cosine"): The distance metric for vector similarity. enable_routing (`bool`, defaults to False): Whether to enable custom routing for data isolation. - enable_hybrid_search (`bool`, defaults to False): - Whether to enable full-text and vector hybrid search. - rrf_rank_constant (`int`, defaults to 60): - RRF rank constant for hybrid search fusion. - rrf_knn_weight_factor (`float`, defaults to 1.0): - Weight factor for KNN in hybrid search. - client_kwargs (`dict[str, Any] | None`, optional): - Additional keyword arguments for OpenSearch client. - index_kwargs (`dict[str, Any] | None`, optional): - Additional keyword arguments for index creation. """ try: @@ -80,23 +59,17 @@ def __init__( "`pip install opensearch-py`.", ) from e - client_kwargs = client_kwargs or {} self._client = OpenSearch( hosts=hosts, http_auth=http_auth, - use_ssl=use_ssl, - verify_certs=verify_certs, - **client_kwargs, + use_ssl=False, + verify_certs=False, ) self.index_name = index_name self.dimensions = dimensions self.distance_metric = distance_metric self.enable_routing = enable_routing - self.enable_hybrid_search = enable_hybrid_search - self.rrf_rank_constant = rrf_rank_constant - self.rrf_knn_weight_factor = rrf_knn_weight_factor - self.index_kwargs = index_kwargs or {} def _create_index_body(self) -> dict[str, Any]: """Create the index body configuration for Lindorm.""" @@ -123,7 +96,6 @@ def _create_index_body(self) -> dict[str, Any]: "knn": True, **knn_settings, }, - **self.index_kwargs.get("settings", {}), }, "mappings": { "_source": {"excludes": ["vector"]}, @@ -142,7 +114,6 @@ def _create_index_body(self) -> dict[str, Any]: "content": {"type": "object", "enabled": False}, "total_chunks": {"type": "integer"}, }, - **self.index_kwargs.get("mappings", {}), }, } @@ -221,89 +192,21 @@ async def search( **kwargs (`Any`): Additional arguments: - routing (`str`): Custom routing key for targeted search. - - query_text (`str`): Text query for hybrid search. - - scalar_filters (`list[dict]`): Scalar filters for filtering. - - filter_type (`str`): Filter type, defaults to \ - "efficient_filter". """ routing = kwargs.get("routing", None) - query_text = kwargs.get("query_text", None) - scalar_filters = kwargs.get("scalar_filters", None) - filter_type = kwargs.get("filter_type", "efficient_filter") - knn_query: dict[str, Any] = { - "vector": query_embedding, - "k": limit, - } - - if self.enable_hybrid_search and (query_text or scalar_filters): - filter_conditions = [] - - if query_text: - filter_conditions.append( - { - "bool": { - "must": [ - { - "match": { - "content": { - "query": query_text, - }, - }, - }, - ], - }, - }, - ) - - if scalar_filters: - filter_conditions.append( - { - "bool": { - "filter": scalar_filters, - }, - }, - ) - - if len(filter_conditions) == 1: - knn_query["filter"] = filter_conditions[0] - else: - knn_query["filter"] = { - "bool": { - "must": filter_conditions, - }, - } - - query_body = { - "size": limit, - "query": { - "knn": { - "vector": knn_query, - }, - }, - "ext": { - "lvector": { - "hybrid_search_type": "filter_rrf", - "rrf_rank_constant": str(self.rrf_rank_constant), - "rrf_knn_weight_factor": str(self.rrf_knn_weight_factor), - }, - }, - } - - if scalar_filters: - query_body["ext"]["lvector"]["filter_type"] = filter_type - else: - query_body = { - "size": limit, - "query": { - "knn": { - "vector": knn_query, + query_body = { + "size": limit, + "query": { + "knn": { + "vector": { + "vector": query_embedding, + "k": limit, }, }, - } - - # Add _source to retrieve document fields - query_body["_source"] = True + }, + "_source": True, + } search_params: dict[str, Any] = { "index": self.index_name, diff --git a/tests/rag_store_test.py b/tests/rag_store_test.py index 87b5071cc0..26db91ec81 100644 --- a/tests/rag_store_test.py +++ b/tests/rag_store_test.py @@ -2,7 +2,7 @@ """Test the RAG store implementations.""" import os from unittest import IsolatedAsyncioTestCase -from unittest.mock import MagicMock, patch, AsyncMock +from unittest.mock import MagicMock, patch from agentscope.message import TextBlock from agentscope.rag import ( @@ -136,7 +136,10 @@ async def asyncTearDown(self) -> None: os.remove("./milvus_demo.db") @patch("opensearchpy.OpenSearch") - async def test_lindorm_store(self, mock_opensearch_class: MagicMock) -> None: + async def test_lindorm_store( + self, + mock_opensearch_class: MagicMock, + ) -> None: """Test the LindormStore implementation.""" mock_client = MagicMock() mock_opensearch_class.return_value = mock_client @@ -144,7 +147,9 @@ async def test_lindorm_store(self, mock_opensearch_class: MagicMock) -> None: mock_client.indices.exists.return_value = False mock_client.indices.create.return_value = {"acknowledged": True} mock_client.index.return_value = {"result": "created"} - mock_client.indices.refresh.return_value = {"_shards": {"successful": 1}} + mock_client.indices.refresh.return_value = { + "_shards": {"successful": 1} + } mock_client.search.return_value = { "hits": { "hits": [ @@ -168,7 +173,6 @@ async def test_lindorm_store(self, mock_opensearch_class: MagicMock) -> None: dimensions=3, http_auth=("user", "pass"), enable_routing=True, - enable_hybrid_search=True, ) await store.add( @@ -197,8 +201,6 @@ async def test_lindorm_store(self, mock_opensearch_class: MagicMock) -> None: limit=3, score_threshold=0.9, routing="user123", - query_text="test document", - scalar_filters=[{"range": {"total_chunks": {"gt": 1}}}], ) self.assertEqual(len(res), 1) @@ -210,11 +212,5 @@ async def test_lindorm_store(self, mock_opensearch_class: MagicMock) -> None: call_args = mock_client.search.call_args query_body = call_args[1]["body"] - self.assertIn("ext", query_body) - self.assertIn("lvector", query_body["ext"]) - self.assertEqual( - query_body["ext"]["lvector"]["hybrid_search_type"], - "filter_rrf", - ) - self.assertIn("filter", query_body["query"]["knn"]["vector"]) - + self.assertEqual(query_body["size"], 3) + self.assertIn("knn", query_body["query"]) From c64c2d52e8e4e14c5d085ae1bc189a7bd400a8b8 Mon Sep 17 00:00:00 2001 From: jiangzhijie Date: Fri, 9 Jan 2026 11:47:15 +0800 Subject: [PATCH 3/3] fix review issues --- pyproject.toml | 2 + src/agentscope/rag/_store/_lindorm_store.py | 61 ++++++++++++++++----- tests/rag_store_test.py | 37 +++++++++++-- 3 files changed, 81 insertions(+), 19 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 4a351bd232..ef27c611af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,6 +69,8 @@ full = [ # The qdrant-client >= 1.16.0 has conflicts with pymilvus, so we fix # the version to 1.15.1 here. "qdrant-client==1.15.1", + # Lindorm vector store (OpenSearch-compatible) + "opensearch-py>=2.0.0", ] dev = [ diff --git a/src/agentscope/rag/_store/_lindorm_store.py b/src/agentscope/rag/_store/_lindorm_store.py index f748fde3d7..c7bf1ca086 100644 --- a/src/agentscope/rag/_store/_lindorm_store.py +++ b/src/agentscope/rag/_store/_lindorm_store.py @@ -29,9 +29,11 @@ def __init__( hosts: list[str], index_name: str, dimensions: int, - http_auth: tuple[str, str] | None = None, + http_auth: tuple[str, str], distance_metric: Literal["l2", "cosine", "inner_product"] = "cosine", enable_routing: bool = False, + use_ssl: bool = False, + verify_certs: bool = False, ) -> None: """Initialize the Lindorm vector store. @@ -42,13 +44,18 @@ def __init__( The name of the index to store embeddings. dimensions (`int`): The dimension of the embeddings. - http_auth (`tuple[str, str] | None`, optional): - HTTP authentication (username, password) tuple. + http_auth (`tuple[str, str]`): + HTTP authentication (username, password) tuple. Required + for Aliyun Lindorm cloud service. distance_metric (`Literal["l2", "cosine", "inner_product"]`, \ defaults to "cosine"): The distance metric for vector similarity. enable_routing (`bool`, defaults to False): Whether to enable custom routing for data isolation. + use_ssl (`bool`, defaults to False): + Whether to use SSL/TLS for the connection. + verify_certs (`bool`, defaults to False): + Whether to verify SSL certificates. """ try: @@ -62,8 +69,9 @@ def __init__( self._client = OpenSearch( hosts=hosts, http_auth=http_auth, - use_ssl=False, - verify_certs=False, + use_ssl=use_ssl, + verify_certs=verify_certs, + ssl_show_warn=False, ) self.index_name = index_name @@ -72,7 +80,13 @@ def __init__( self.enable_routing = enable_routing def _create_index_body(self) -> dict[str, Any]: - """Create the index body configuration for Lindorm.""" + """Create the index body configuration for Lindorm. + + Returns: + `dict[str, Any]`: + The index configuration body including settings and mappings + for vector storage with Lindorm's lvector engine. + """ knn_settings: dict[str, Any] = {} if self.enable_routing: knn_settings["knn_routing"] = True @@ -120,7 +134,16 @@ def _create_index_body(self) -> dict[str, Any]: return index_body async def _validate_index(self) -> None: - """Validate the index exists, if not, create it.""" + """Validate the index exists, and create it if not. + + This method checks if the index exists in Lindorm. If the index + does not exist, it will be created with the appropriate vector + configuration. + + Raises: + Exception: If index creation fails due to connection issues + or invalid configuration. + """ if not self._client.indices.exists(index=self.index_name): index_body = self._create_index_body() self._client.indices.create( @@ -247,17 +270,27 @@ async def search( return collected_res - async def delete(self, *args: Any, **kwargs: Any) -> None: + async def delete( + self, + doc_ids: list[str], + routing: str | None = None, + ) -> None: """Delete documents from the Lindorm vector store. Args: - **kwargs (`Any`): - - doc_ids (`list[str]`): List of document IDs to delete. - - routing (`str`): Custom routing key. + doc_ids (`list[str]`): + List of internal document UUIDs to delete. These values must + match the index document IDs generated during :meth:`add` by + combining ``doc.metadata.doc_id`` and + ``doc.metadata.chunk_id``, and are not the original + ``doc.metadata.doc_id`` values. + routing (`str | None`, optional): + Custom routing key for targeted deletion when routing is + enabled. Defaults to None. + + Raises: + ValueError: If ``doc_ids`` is empty. """ - doc_ids = kwargs.get("doc_ids", []) - routing = kwargs.get("routing", None) - if not doc_ids: raise ValueError("doc_ids must be provided for deletion.") diff --git a/tests/rag_store_test.py b/tests/rag_store_test.py index 26db91ec81..3b33e09bef 100644 --- a/tests/rag_store_test.py +++ b/tests/rag_store_test.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- """Test the RAG store implementations.""" +import json import os from unittest import IsolatedAsyncioTestCase from unittest.mock import MagicMock, patch @@ -12,6 +13,7 @@ MilvusLiteStore, LindormStore, ) +from agentscope._utils._common import _map_text_to_uuid class RAGStoreTest(IsolatedAsyncioTestCase): @@ -147,9 +149,7 @@ async def test_lindorm_store( mock_client.indices.exists.return_value = False mock_client.indices.create.return_value = {"acknowledged": True} mock_client.index.return_value = {"result": "created"} - mock_client.indices.refresh.return_value = { - "_shards": {"successful": 1} - } + mock_client.indices.refresh.return_value = {"_shards": {"successful": 1}} mock_client.search.return_value = { "hits": { "hits": [ @@ -159,7 +159,10 @@ async def test_lindorm_store( "vector": [0.1, 0.2, 0.3], "doc_id": "doc1", "chunk_id": 0, - "content": "This is a test document.", + "content": { + "type": "text", + "text": "This is a test document.", + }, "total_chunks": 2, }, }, @@ -206,7 +209,7 @@ async def test_lindorm_store( self.assertEqual(len(res), 1) self.assertEqual(res[0].score, 0.95) self.assertEqual( - res[0].metadata.content, + res[0].metadata.content["text"], "This is a test document.", ) @@ -214,3 +217,27 @@ async def test_lindorm_store( query_body = call_args[1]["body"] self.assertEqual(query_body["size"], 3) self.assertIn("knn", query_body["query"]) + + # Test delete + mock_client.delete.return_value = {"result": "deleted"} + mock_client.indices.refresh.return_value = {"_shards": {"successful": 1}} + + # Generate a doc_id similar to how add() does it + unique_string = json.dumps( + {"doc_id": "doc1", "chunk_id": 0}, + ensure_ascii=False, + ) + doc_id_to_delete = _map_text_to_uuid(unique_string) + + await store.delete(doc_ids=[doc_id_to_delete], routing="user123") + + self.assertTrue(mock_client.delete.called) + delete_call_args = mock_client.delete.call_args + self.assertEqual( + delete_call_args[1]["id"], + doc_id_to_delete, + ) + self.assertEqual( + delete_call_args[1]["routing"], + "user123", + )