Skip to content

Commit

Permalink
Add improvements for opensearch engine and updated the docker compose.
Browse files Browse the repository at this point in the history
Signed-off-by: Navneet Verma <[email protected]>
  • Loading branch information
navneet1v committed Nov 5, 2024
1 parent ea53db4 commit 3c418c7
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 47 deletions.
53 changes: 36 additions & 17 deletions engine/clients/opensearch/configure.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from opensearchpy import NotFoundError, OpenSearch

from benchmark.dataset import Dataset
from engine.base_client import IncompatibilityError
from engine.base_client.configure import BaseConfigurator
from engine.base_client.distances import Distance
from engine.clients.opensearch.config import (
Expand All @@ -10,6 +9,7 @@
OPENSEARCH_PORT,
OPENSEARCH_USER,
)
from engine.clients.opensearch.utils import get_index_thread_qty


class OpenSearchConfigurator(BaseConfigurator):
Expand Down Expand Up @@ -40,28 +40,36 @@ def __init__(self, host, collection_params: dict, connection_params: dict):
)

def clean(self):
try:
is_index_available = self.client.indices.exists(index=OPENSEARCH_INDEX,
params={
"timeout": 300,
})
if(is_index_available):
print(f"Deleting index: {OPENSEARCH_INDEX}, as it is already present")
self.client.indices.delete(
index=OPENSEARCH_INDEX,
params={
"timeout": 300,
},
)
except NotFoundError:
pass


def recreate(self, dataset: Dataset, collection_params):
if dataset.config.distance == Distance.DOT:
raise IncompatibilityError
if dataset.config.vector_size > 1024:
raise IncompatibilityError
self._update_cluster_settings()
distance = self.DISTANCE_MAPPING[dataset.config.distance]
if dataset.config.distance == Distance.COSINE:
distance = self.DISTANCE_MAPPING[Distance.DOT]
print(f"Using distance type: {distance} as dataset distance is : {dataset.config.distance}")

self.client.indices.create(
index=OPENSEARCH_INDEX,
body={
"settings": {
"index": {
"knn": True,
"refresh_interval": -1,
"number_of_replicas": 0,
"number_of_shards": 1
}
},
"mappings": {
Expand All @@ -72,18 +80,13 @@ def recreate(self, dataset: Dataset, collection_params):
"method": {
**{
"name": "hnsw",
"engine": "lucene",
"space_type": self.DISTANCE_MAPPING[
dataset.config.distance
],
"parameters": {
"m": 16,
"ef_construction": 100,
},
"engine": "faiss",
"space_type": distance,
**collection_params.get("method")
},
**collection_params.get("method"),
},
},
# this doesn't work for nmslib, we need see what to do here, may be remove them
**self._prepare_fields_config(dataset),
}
},
Expand All @@ -94,6 +97,16 @@ def recreate(self, dataset: Dataset, collection_params):
cluster_manager_timeout="5m",
)

def _update_cluster_settings(self):
index_thread_qty = get_index_thread_qty(self.client)
cluster_settings_body = {
"persistent": {
"knn.memory.circuit_breaker.limit": "75%", # putting a higher value to ensure that even with small cluster the latencies for vector search are good
"knn.algo_param.index_thread_qty": index_thread_qty
}
}
self.client.cluster.put_settings(cluster_settings_body)

def _prepare_fields_config(self, dataset: Dataset):
return {
field_name: {
Expand All @@ -104,3 +117,9 @@ def _prepare_fields_config(self, dataset: Dataset):
}
for field_name, field_type in dataset.config.schema.items()
}

def execution_params(self, distance, vector_size) -> dict:
# normalize the vectors if cosine similarity is there.
if distance == Distance.COSINE:
return {"normalize": "true"}
return {}
21 changes: 16 additions & 5 deletions engine/clients/opensearch/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from opensearchpy import OpenSearch

from dataset_reader.base_reader import Query
from engine.base_client.distances import Distance
from engine.base_client.search import BaseSearcher
from engine.clients.opensearch.config import (
OPENSEARCH_INDEX,
Expand All @@ -13,6 +14,7 @@
OPENSEARCH_USER,
)
from engine.clients.opensearch.parser import OpenSearchConditionParser
import numpy as np


class ClosableOpenSearch(OpenSearch):
Expand Down Expand Up @@ -44,6 +46,7 @@ def init_client(cls, host, distance, connection_params: dict, search_params: dic
basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD),
**init_params,
)
cls.distance = distance
cls.search_params = search_params

@classmethod
Expand All @@ -53,6 +56,9 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
"vector": {
"vector": query.vector,
"k": top,
"method_parameters" : {
"ef_search": cls.search_params["config"]["ef_search"] # ef_search parameter is added in the query time
}
}
}
}
Expand All @@ -70,15 +76,20 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
params={
"timeout": 60,
},
_source=False,
docvalue_fields=["_id"],
stored_fields="_none_",
)

return [
(uuid.UUID(hex=hit["_id"]).int, hit["_score"])
(uuid.UUID(hex=hit["fields"]["_id"][0]).int, hit["_score"])
for hit in res["hits"]["hits"]
]

@classmethod
def setup_search(cls):
if cls.search_params:
cls.client.indices.put_settings(
body=cls.search_params["config"], index=OPENSEARCH_INDEX
)
print(f"Search Parameters: {cls.search_params}")
# Load the graphs in memory
warmup_endpoint = f'/_plugins/_knn/warmup/{OPENSEARCH_INDEX}'
print(f"Loading indices to memory: {OPENSEARCH_INDEX}")
cls.client.transport.perform_request('GET', warmup_endpoint)
34 changes: 28 additions & 6 deletions engine/clients/opensearch/upload.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import multiprocessing as mp
import uuid
import time
from typing import List

from opensearchpy import OpenSearch

from dataset_reader.base_reader import Record
from engine.base_client.distances import Distance
from engine.base_client.upload import BaseUploader
from engine.clients.opensearch.config import (
OPENSEARCH_INDEX,
OPENSEARCH_PASSWORD,
OPENSEARCH_PORT,
OPENSEARCH_USER,
)
from engine.clients.opensearch.utils import get_index_thread_qty_for_force_merge, update_force_merge_threads


class ClosableOpenSearch(OpenSearch):
Expand Down Expand Up @@ -62,10 +65,29 @@ def upload_batch(cls, batch: List[Record]):

@classmethod
def post_upload(cls, _distance):
cls.client.indices.forcemerge(
index=OPENSEARCH_INDEX,
params={
"timeout": 300,
},
)
# ensuring that index is refreshed before force merge
cls._refresh_index()
cls._force_merge_index()
# ensuring that only force merged segments are remaining
cls._refresh_index()
return {}

@classmethod
def _refresh_index(cls):
print(f"Refreshing index: {OPENSEARCH_INDEX}")
params={"timeout": 300}
cls.client.indices.refresh(index=OPENSEARCH_INDEX, params=params)

@classmethod
def _force_merge_index(cls):
index_thread_qty = get_index_thread_qty_for_force_merge(cls.client)
update_force_merge_threads(client = cls.client, index_thread_qty = index_thread_qty)
force_merge_endpoint = f'/{OPENSEARCH_INDEX}/_forcemerge?max_num_segments=1&wait_for_completion=false'
force_merge_task_id = cls.client.transport.perform_request('POST', force_merge_endpoint)['task']
SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30
print(f"Starting force merge on index: {OPENSEARCH_INDEX}, task_id: {force_merge_task_id}")
while True:
time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC)
task_status = cls.client.tasks.get(task_id=force_merge_task_id)
if task_status['completed']:
break
85 changes: 85 additions & 0 deletions engine/clients/opensearch/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from opensearchpy import OpenSearch

def get_index_thread_qty_for_force_merge(client: OpenSearch):
processors_per_node = get_cores_for_data_nodes(client=client)
# since during force merge only 1 shard will be doing the merge we can be aggressive in parallelization factor
index_thread_qty = max(1, processors_per_node // 2)
print(f"Index thread qty for force merge: {index_thread_qty}")
return index_thread_qty

def get_index_thread_qty(client: OpenSearch):
processors_per_node = get_cores_for_data_nodes(client=client)
# since during index more than 1 shard will be doing indexing, we are becoming conservative in parallelization factor
index_thread_qty = max(1, processors_per_node // 8)
print(f"Index thread qty for indexing: {index_thread_qty}")
return index_thread_qty


def get_cores_for_data_nodes(client: OpenSearch):
# Sample nodes info response which is getting parsed.
# {
# "nodes": {
# "Or9Nm4UJR3-gcMOGwJhHHQ": {
# "roles": [
# "data",
# "ingest",
# "master",
# "remote_cluster_client"
# ],
# "os": {
# "refresh_interval_in_millis": 1000,
# "available_processors": 8,
# "allocated_processors": 8
# }
# },
# "A-cqbeekROeR3kzKhOXpRw": {
# "roles": [
# "data",
# "ingest",
# "master",
# "remote_cluster_client"
# ],
# "os": {
# "refresh_interval_in_millis": 1000,
# "available_processors": 8,
# "allocated_processors": 8
# }
# },
# "FrDs-vOMQ8yDZ0HEkDwRHA": {
# "roles": [
# "data",
# "ingest",
# "master",
# "remote_cluster_client"
# ],
# "os": {
# "refresh_interval_in_millis": 1000,
# "available_processors": 8,
# "allocated_processors": 8
# }
# }
# }
# }

nodes_stats_res = client.nodes.info(filter_path="nodes.*.roles,nodes.*.os")
nodes_data = nodes_stats_res.get("nodes")
data_node_count = 0
total_processors = 0
for node_id in nodes_data:
node_info = nodes_data.get(node_id)
roles = node_info["roles"]
os_info = node_info["os"]
if 'data' in roles:
data_node_count += 1
total_processors += int(os_info['allocated_processors'])
processors_per_node = total_processors // data_node_count
return processors_per_node


def update_force_merge_threads(client: OpenSearch, index_thread_qty=1):
cluster_settings_body = {
"persistent": {
"knn.algo_param.index_thread_qty": index_thread_qty
}
}
client.cluster.put_settings(cluster_settings_body)
6 changes: 4 additions & 2 deletions engine/servers/opensearch-single-node-ci/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ version: '3.5'

services:
opensearch:
image: opensearchproject/opensearch:2.10.0
image: opensearchproject/opensearch:2.17.1
environment:
discovery.type: "single-node"
plugins.security.disabled: true
DISABLE_SECURITY_PLUGIN: true
DISABLE_INSTALL_DEMO_CONFIG: true
bootstrap.memory_lock: true
OPENSEARCH_JAVA_OPTS: "-Xms2g -Xmx2g"
ports:
- "9200:9200"
Expand Down
13 changes: 11 additions & 2 deletions engine/servers/opensearch-single-node/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,23 @@ version: '3.5'

services:
opensearch:
image: opensearchproject/opensearch:2.10.0
image: opensearchproject/opensearch:2.17.1
environment:
discovery.type: "single-node"
plugins.security.disabled: true
DISABLE_SECURITY_PLUGIN: true
DISABLE_INSTALL_DEMO_CONFIG: true
bootstrap.memory_lock: true
OPENSEARCH_JAVA_OPTS: "-Xms4g -Xmx4g"
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1 # Set memlock to unlimited (no soft or hard limit)
hard: -1
nofile:
soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536
hard: 65536
logging:
driver: "json-file"
options:
Expand Down
Loading

0 comments on commit 3c418c7

Please sign in to comment.