-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Add alibaba-cloud vdb lindorm as a vdb choice. #1075
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Add Aliyun Lindorm vector store support with hybrid search and custom routing features. - Add LindormStore class with lvector engine - Map distance metrics (cosine->cosinesimil, inner_product->innerproduct) - Add _source to query for Lindorm compatibility - Add integration tests with real Lindorm connection - Export LindormStore in rag module
* main: chore(version): update version to 1.0.11 (agentscope-ai#1074) fix(anthropic): fix the bug that anthropic response may have text field even for the tool_use block. (agentscope-ai#1070) fix(Gemini): fix the bug that Gemini LLMs doesn't support nested JSON schema in its tools API (agentscope-ai#1050) fix(mcp): support to raise the exception on MCP disconnection (agentscope-ai#1024) feat(a2a): support A2A protocol for inter-agent communication (agentscope-ai#1027)
DavdGao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plz see inline comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds support for Alibaba Cloud VDB Lindorm as a new vector database option for AgentScope's RAG functionality. Lindorm uses the OpenSearch-compatible API and supports vector similarity search with custom routing for data isolation.
Key changes:
- Implements
LindormStoreclass with full CRUD operations (add, search, delete) for vector storage - Adds comprehensive unit tests with mocked OpenSearch client
- Exposes
LindormStorethrough the RAG module's public API
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
src/agentscope/rag/_store/_lindorm_store.py |
New vector store implementation for Lindorm with OpenSearch-compatible API, supporting vector similarity search, custom routing, and configurable distance metrics |
src/agentscope/rag/_store/__init__.py |
Exports LindormStore class from the store module |
src/agentscope/rag/__init__.py |
Exposes LindormStore in the public RAG API |
tests/rag_store_test.py |
Adds unit test for LindormStore with mocked OpenSearch client testing add and search operations |
Resolved conflicts: - pyproject.toml: kept both mysql-connector-python and opensearch-py - src/agentscope/rag/__init__.py: export both AlibabaCloudMySQLStore and LindormStore - src/agentscope/rag/_store/__init__.py: import both stores - tests/rag_store_test.py: added test_alibabacloud_mysql_store from main
|
I have fixed the problems. |
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces support for Alibaba Cloud Lindorm as a vector database store. The implementation is based on the opensearch-py library, which is appropriate given Lindorm's OpenSearch compatibility.
My review has identified several critical issues related to the use of synchronous client calls within asynchronous methods, which will block the asyncio event loop and cause significant performance problems. I have provided suggestions to fix these by using asyncio.to_thread. Additionally, I've included recommendations to improve performance by using the bulk APIs for adding and deleting documents, which is a best practice for batch operations. There is also a medium-severity issue with the search result parsing logic that could lead to bugs.
Overall, the contribution is valuable, but the identified issues, especially the blocking calls, must be addressed before merging.
| if not self._client.indices.exists(index=self.index_name): | ||
| index_body = self._create_index_body() | ||
| self._client.indices.create( | ||
| index=self.index_name, | ||
| body=index_body, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The opensearch-py client's methods (indices.exists, indices.create) are synchronous and perform blocking I/O. Calling them directly inside an async method will block the event loop, causing performance degradation. These calls should be wrapped in await asyncio.to_thread(...) to run them in a background thread. You'll need to import asyncio at the top of the file.
if not await asyncio.to_thread(
self._client.indices.exists, index=self.index_name
):
index_body = self._create_index_body()
await asyncio.to_thread(
self._client.indices.create,
index=self.index_name,
body=index_body,
)| if self.enable_routing and routing: | ||
| index_params["routing"] = routing | ||
|
|
||
| self._client.index(**index_params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| self._client.index(**index_params) | ||
|
|
||
| self._client.indices.refresh(index=self.index_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if self.enable_routing and routing: | ||
| search_params["routing"] = routing | ||
|
|
||
| response = self._client.search(**search_params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if self.enable_routing and routing: | ||
| delete_params["routing"] = routing | ||
|
|
||
| self._client.delete(**delete_params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| self._client.delete(**delete_params) | ||
|
|
||
| self._client.indices.refresh(index=self.index_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| for doc in documents: | ||
| unique_string = json.dumps( | ||
| { | ||
| "doc_id": doc.metadata.doc_id, | ||
| "chunk_id": doc.metadata.chunk_id, | ||
| }, | ||
| ensure_ascii=False, | ||
| ) | ||
| doc_id = _map_text_to_uuid(unique_string) | ||
|
|
||
| body = { | ||
| "vector": doc.embedding, | ||
| "doc_id": doc.metadata.doc_id, | ||
| "chunk_id": doc.metadata.chunk_id, | ||
| "content": doc.metadata.content, | ||
| "total_chunks": doc.metadata.total_chunks, | ||
| } | ||
|
|
||
| index_params: dict[str, Any] = { | ||
| "index": self.index_name, | ||
| "id": doc_id, | ||
| "body": body, | ||
| } | ||
|
|
||
| if self.enable_routing and routing: | ||
| index_params["routing"] = routing | ||
|
|
||
| self._client.index(**index_params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indexing documents one by one in a loop is inefficient and can lead to poor performance, especially with a large number of documents. It's highly recommended to use the OpenSearch Bulk API (opensearchpy.helpers.bulk) to add multiple documents in a single request. This will significantly improve performance by reducing network overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good suggestion, which will also improve the user experience for aliyun lindorm!
| for doc_id in doc_ids: | ||
| delete_params: dict[str, Any] = { | ||
| "index": self.index_name, | ||
| "id": doc_id, | ||
| } | ||
|
|
||
| if self.enable_routing and routing: | ||
| delete_params["routing"] = routing | ||
|
|
||
| self._client.delete(**delete_params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar as above.
| if not source: | ||
| # Lindorm might return fields directly without _source | ||
| source = hit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fallback logic source = hit is unsafe. The hit object contains metadata fields (_score, _index, etc.) besides the document source. Assigning hit to source will cause subsequent source.get() calls to read from the wrong data structure. Since _source: True is specified in the query, hit["_source"] should always be present. If it's missing, it's safer to skip the hit to avoid unexpected behavior.
if not source:
# The comment suggests Lindorm might return fields directly, but the
# fallback `source = hit` is unsafe. Since `_source` is explicitly
# requested, we should skip hits without it to avoid errors.
continue|
Hi, could you please resolve the merge conflicts? I noticed the CI is failing. Syncing with the current main branch should fix the error cases.
Others lgtm |
AgentScope Version
[The version of AgentScope you are working on, e.g.
import agentscope; print(agentscope.__version__)]Description
Add a
Checklist
Please check the following items before code is ready to be reviewed.
pre-commit run --all-filescommand