diff --git a/engine/clients/opensearch/configure.py b/engine/clients/opensearch/configure.py index bd550917..285bafce 100644 --- a/engine/clients/opensearch/configure.py +++ b/engine/clients/opensearch/configure.py @@ -1,7 +1,6 @@ from opensearchpy import NotFoundError, OpenSearch from benchmark.dataset import Dataset -from engine.base_client import IncompatibilityError from engine.base_client.configure import BaseConfigurator from engine.base_client.distances import Distance from engine.clients.opensearch.config import ( @@ -10,6 +9,7 @@ OPENSEARCH_PORT, OPENSEARCH_USER, ) +from engine.clients.opensearch.utils import get_index_thread_qty class OpenSearchConfigurator(BaseConfigurator): @@ -40,21 +40,26 @@ def __init__(self, host, collection_params: dict, connection_params: dict): ) def clean(self): - try: + is_index_available = self.client.indices.exists(index=OPENSEARCH_INDEX, + params={ + "timeout": 300, + }) + if(is_index_available): + print(f"Deleting index: {OPENSEARCH_INDEX}, as it is already present") self.client.indices.delete( index=OPENSEARCH_INDEX, params={ "timeout": 300, }, ) - except NotFoundError: - pass + def recreate(self, dataset: Dataset, collection_params): - if dataset.config.distance == Distance.DOT: - raise IncompatibilityError - if dataset.config.vector_size > 1024: - raise IncompatibilityError + self._update_cluster_settings() + distance = self.DISTANCE_MAPPING[dataset.config.distance] + if dataset.config.distance == Distance.COSINE: + distance = self.DISTANCE_MAPPING[Distance.DOT] + print(f"Using distance type: {distance} as dataset distance is : {dataset.config.distance}") self.client.indices.create( index=OPENSEARCH_INDEX, @@ -62,6 +67,9 @@ def recreate(self, dataset: Dataset, collection_params): "settings": { "index": { "knn": True, + "refresh_interval": -1, + "number_of_replicas": 0, + "number_of_shards": 1 } }, "mappings": { @@ -72,18 +80,13 @@ def recreate(self, dataset: Dataset, collection_params): "method": { **{ "name": "hnsw", - "engine": "lucene", - "space_type": self.DISTANCE_MAPPING[ - dataset.config.distance - ], - "parameters": { - "m": 16, - "ef_construction": 100, - }, + "engine": "faiss", + "space_type": distance, + **collection_params.get("method") }, - **collection_params.get("method"), }, }, + # this doesn't work for nmslib, we need see what to do here, may be remove them **self._prepare_fields_config(dataset), } }, @@ -94,6 +97,16 @@ def recreate(self, dataset: Dataset, collection_params): cluster_manager_timeout="5m", ) + def _update_cluster_settings(self): + index_thread_qty = get_index_thread_qty(self.client) + cluster_settings_body = { + "persistent": { + "knn.memory.circuit_breaker.limit": "75%", # putting a higher value to ensure that even with small cluster the latencies for vector search are good + "knn.algo_param.index_thread_qty": index_thread_qty + } + } + self.client.cluster.put_settings(cluster_settings_body) + def _prepare_fields_config(self, dataset: Dataset): return { field_name: { @@ -104,3 +117,9 @@ def _prepare_fields_config(self, dataset: Dataset): } for field_name, field_type in dataset.config.schema.items() } + + def execution_params(self, distance, vector_size) -> dict: + # normalize the vectors if cosine similarity is there. + if distance == Distance.COSINE: + return {"normalize": "true"} + return {} diff --git a/engine/clients/opensearch/search.py b/engine/clients/opensearch/search.py index fc7b5cbf..a4d2ec6d 100644 --- a/engine/clients/opensearch/search.py +++ b/engine/clients/opensearch/search.py @@ -5,6 +5,7 @@ from opensearchpy import OpenSearch from dataset_reader.base_reader import Query +from engine.base_client.distances import Distance from engine.base_client.search import BaseSearcher from engine.clients.opensearch.config import ( OPENSEARCH_INDEX, @@ -13,6 +14,7 @@ OPENSEARCH_USER, ) from engine.clients.opensearch.parser import OpenSearchConditionParser +import numpy as np class ClosableOpenSearch(OpenSearch): @@ -44,6 +46,7 @@ def init_client(cls, host, distance, connection_params: dict, search_params: dic basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD), **init_params, ) + cls.distance = distance cls.search_params = search_params @classmethod @@ -53,6 +56,9 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: "vector": { "vector": query.vector, "k": top, + "method_parameters" : { + "ef_search": cls.search_params["config"]["ef_search"] # ef_search parameter is added in the query time + } } } } @@ -70,15 +76,20 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: params={ "timeout": 60, }, + _source=False, + docvalue_fields=["_id"], + stored_fields="_none_", ) + return [ - (uuid.UUID(hex=hit["_id"]).int, hit["_score"]) + (uuid.UUID(hex=hit["fields"]["_id"][0]).int, hit["_score"]) for hit in res["hits"]["hits"] ] @classmethod def setup_search(cls): - if cls.search_params: - cls.client.indices.put_settings( - body=cls.search_params["config"], index=OPENSEARCH_INDEX - ) + print(f"Search Parameters: {cls.search_params}") + # Load the graphs in memory + warmup_endpoint = f'/_plugins/_knn/warmup/{OPENSEARCH_INDEX}' + print(f"Loading indices to memory: {OPENSEARCH_INDEX}") + cls.client.transport.perform_request('GET', warmup_endpoint) diff --git a/engine/clients/opensearch/upload.py b/engine/clients/opensearch/upload.py index 0bc2427e..00efc340 100644 --- a/engine/clients/opensearch/upload.py +++ b/engine/clients/opensearch/upload.py @@ -1,10 +1,12 @@ import multiprocessing as mp import uuid +import time from typing import List from opensearchpy import OpenSearch from dataset_reader.base_reader import Record +from engine.base_client.distances import Distance from engine.base_client.upload import BaseUploader from engine.clients.opensearch.config import ( OPENSEARCH_INDEX, @@ -12,6 +14,7 @@ OPENSEARCH_PORT, OPENSEARCH_USER, ) +from engine.clients.opensearch.utils import get_index_thread_qty_for_force_merge, update_force_merge_threads class ClosableOpenSearch(OpenSearch): @@ -62,10 +65,29 @@ def upload_batch(cls, batch: List[Record]): @classmethod def post_upload(cls, _distance): - cls.client.indices.forcemerge( - index=OPENSEARCH_INDEX, - params={ - "timeout": 300, - }, - ) + # ensuring that index is refreshed before force merge + cls._refresh_index() + cls._force_merge_index() + # ensuring that only force merged segments are remaining + cls._refresh_index() return {} + + @classmethod + def _refresh_index(cls): + print(f"Refreshing index: {OPENSEARCH_INDEX}") + params={"timeout": 300} + cls.client.indices.refresh(index=OPENSEARCH_INDEX, params=params) + + @classmethod + def _force_merge_index(cls): + index_thread_qty = get_index_thread_qty_for_force_merge(cls.client) + update_force_merge_threads(client = cls.client, index_thread_qty = index_thread_qty) + force_merge_endpoint = f'/{OPENSEARCH_INDEX}/_forcemerge?max_num_segments=1&wait_for_completion=false' + force_merge_task_id = cls.client.transport.perform_request('POST', force_merge_endpoint)['task'] + SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30 + print(f"Starting force merge on index: {OPENSEARCH_INDEX}, task_id: {force_merge_task_id}") + while True: + time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC) + task_status = cls.client.tasks.get(task_id=force_merge_task_id) + if task_status['completed']: + break \ No newline at end of file diff --git a/engine/clients/opensearch/utils.py b/engine/clients/opensearch/utils.py new file mode 100644 index 00000000..ccca60f0 --- /dev/null +++ b/engine/clients/opensearch/utils.py @@ -0,0 +1,85 @@ +from opensearchpy import OpenSearch + +def get_index_thread_qty_for_force_merge(client: OpenSearch): + processors_per_node = get_cores_for_data_nodes(client=client) + # since during force merge only 1 shard will be doing the merge we can be aggressive in parallelization factor + index_thread_qty = max(1, processors_per_node // 2) + print(f"Index thread qty for force merge: {index_thread_qty}") + return index_thread_qty + +def get_index_thread_qty(client: OpenSearch): + processors_per_node = get_cores_for_data_nodes(client=client) + # since during index more than 1 shard will be doing indexing, we are becoming conservative in parallelization factor + index_thread_qty = max(1, processors_per_node // 8) + print(f"Index thread qty for indexing: {index_thread_qty}") + return index_thread_qty + + +def get_cores_for_data_nodes(client: OpenSearch): + # Sample nodes info response which is getting parsed. + # { + # "nodes": { + # "Or9Nm4UJR3-gcMOGwJhHHQ": { + # "roles": [ + # "data", + # "ingest", + # "master", + # "remote_cluster_client" + # ], + # "os": { + # "refresh_interval_in_millis": 1000, + # "available_processors": 8, + # "allocated_processors": 8 + # } + # }, + # "A-cqbeekROeR3kzKhOXpRw": { + # "roles": [ + # "data", + # "ingest", + # "master", + # "remote_cluster_client" + # ], + # "os": { + # "refresh_interval_in_millis": 1000, + # "available_processors": 8, + # "allocated_processors": 8 + # } + # }, + # "FrDs-vOMQ8yDZ0HEkDwRHA": { + # "roles": [ + # "data", + # "ingest", + # "master", + # "remote_cluster_client" + # ], + # "os": { + # "refresh_interval_in_millis": 1000, + # "available_processors": 8, + # "allocated_processors": 8 + # } + # } + # } + # } + + nodes_stats_res = client.nodes.info(filter_path="nodes.*.roles,nodes.*.os") + nodes_data = nodes_stats_res.get("nodes") + data_node_count = 0 + total_processors = 0 + for node_id in nodes_data: + node_info = nodes_data.get(node_id) + roles = node_info["roles"] + os_info = node_info["os"] + if 'data' in roles: + data_node_count += 1 + total_processors += int(os_info['allocated_processors']) + processors_per_node = total_processors // data_node_count + return processors_per_node + + +def update_force_merge_threads(client: OpenSearch, index_thread_qty=1): + cluster_settings_body = { + "persistent": { + "knn.algo_param.index_thread_qty": index_thread_qty + } + } + client.cluster.put_settings(cluster_settings_body) diff --git a/engine/servers/opensearch-single-node-ci/docker-compose.yaml b/engine/servers/opensearch-single-node-ci/docker-compose.yaml index 18d97779..5b375057 100644 --- a/engine/servers/opensearch-single-node-ci/docker-compose.yaml +++ b/engine/servers/opensearch-single-node-ci/docker-compose.yaml @@ -2,10 +2,12 @@ version: '3.5' services: opensearch: - image: opensearchproject/opensearch:2.10.0 + image: opensearchproject/opensearch:2.17.1 environment: discovery.type: "single-node" - plugins.security.disabled: true + DISABLE_SECURITY_PLUGIN: true + DISABLE_INSTALL_DEMO_CONFIG: true + bootstrap.memory_lock: true OPENSEARCH_JAVA_OPTS: "-Xms2g -Xmx2g" ports: - "9200:9200" diff --git a/engine/servers/opensearch-single-node/docker-compose.yaml b/engine/servers/opensearch-single-node/docker-compose.yaml index 23af068f..c1f5b4a0 100644 --- a/engine/servers/opensearch-single-node/docker-compose.yaml +++ b/engine/servers/opensearch-single-node/docker-compose.yaml @@ -2,14 +2,23 @@ version: '3.5' services: opensearch: - image: opensearchproject/opensearch:2.10.0 + image: opensearchproject/opensearch:2.17.1 environment: discovery.type: "single-node" - plugins.security.disabled: true + DISABLE_SECURITY_PLUGIN: true + DISABLE_INSTALL_DEMO_CONFIG: true + bootstrap.memory_lock: true OPENSEARCH_JAVA_OPTS: "-Xms4g -Xmx4g" ports: - "9200:9200" - "9300:9300" + ulimits: + memlock: + soft: -1 # Set memlock to unlimited (no soft or hard limit) + hard: -1 + nofile: + soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536 + hard: 65536 logging: driver: "json-file" options: diff --git a/experiments/configurations/opensearch-single-node.json b/experiments/configurations/opensearch-single-node.json index dda9314f..5b01b8ac 100644 --- a/experiments/configurations/opensearch-single-node.json +++ b/experiments/configurations/opensearch-single-node.json @@ -5,10 +5,10 @@ "connection_params": { "request_timeout": 10000 }, - "collection_params": { "method": { "parameters": { "m": 16, "ef_construction": 100 } } }, + "collection_params": { "method": { "parameters": { "m": 16, "ef_construction": 128 } } }, "search_params": [ - { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, - { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } + { "parallel": 1, "config": { "ef_search": 128 } }, { "parallel": 1, "config": { "ef_search": 256 } }, { "parallel": 1, "config": { "ef_search": 512 } }, + { "parallel": 100, "config": { "ef_search": 128 } }, { "parallel": 100, "config": { "ef_search": 256 } }, { "parallel": 100, "config": { "ef_search": 512 } } ], "upload_params": { "parallel": 16 } }, @@ -20,8 +20,8 @@ }, "collection_params": { "method": { "parameters": { "m": 16, "ef_construction": 128 } } }, "search_params": [ - { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, - { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + { "parallel": 1, "config": { "ef_search": 128 } }, { "parallel": 1, "config": { "ef_search": 256 } }, { "parallel": 1, "config": { "ef_search": 512 } }, + { "parallel": 100, "config": { "ef_search": 128 } }, { "parallel": 100, "config": { "ef_search": 256 } }, { "parallel": 100, "config": { "ef_search": 512 } } ], "upload_params": { "parallel": 16 } }, { @@ -32,8 +32,8 @@ }, "collection_params": { "method": { "parameters": { "m": 32, "ef_construction": 128 } } }, "search_params": [ - { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, - { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + { "parallel": 1, "config": { "ef_search": 128 } }, { "parallel": 1, "config": { "ef_search": 256 } }, { "parallel": 1, "config": { "ef_search": 512 } }, + { "parallel": 100, "config": { "ef_search": 128 } }, { "parallel": 100, "config": { "ef_search": 256 } }, { "parallel": 100, "config": { "ef_search": 512 } } ], "upload_params": { "parallel": 16 } }, { @@ -44,8 +44,8 @@ }, "collection_params": { "method": { "parameters": { "m": 32, "ef_construction": 256 } } }, "search_params": [ - { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, - { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + { "parallel": 1, "config": { "ef_search": 128 } }, { "parallel": 1, "config": { "ef_search": 256 } }, { "parallel": 1, "config": { "ef_search": 512 } }, + { "parallel": 100, "config": { "ef_search": 128 } }, { "parallel": 100, "config": { "ef_search": 256 } }, { "parallel": 100, "config": { "ef_search": 512 } } ], "upload_params": { "parallel": 16 } }, { @@ -56,8 +56,8 @@ }, "collection_params": { "method": { "parameters": { "m": 32, "ef_construction": 512 } } }, "search_params": [ - { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, - { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + { "parallel": 1, "config": { "ef_search": 128 } }, { "parallel": 1, "config": { "ef_search": 256 } }, { "parallel": 1, "config": { "ef_search": 512 } }, + { "parallel": 100, "config": { "ef_search": 128 } }, { "parallel": 100, "config": { "ef_search": 256 } }, { "parallel": 100, "config": { "ef_search": 512 } } ], "upload_params": { "parallel": 16 } }, { @@ -68,8 +68,8 @@ }, "collection_params": { "method": { "parameters": { "m": 64, "ef_construction": 256 } } }, "search_params": [ - { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, - { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + { "parallel": 1, "config": { "ef_search": 128 } }, { "parallel": 1, "config": { "ef_search": 256 } }, { "parallel": 1, "config": { "ef_search": 512 } }, + { "parallel": 100, "config": { "ef_search": 128 } }, { "parallel": 100, "config": { "ef_search": 256 } }, { "parallel": 100, "config": { "ef_search": 512 } } ], "upload_params": { "parallel": 16 } }, { @@ -80,8 +80,8 @@ }, "collection_params": { "method": { "parameters": { "m": 64, "ef_construction": 512 } } }, "search_params": [ - { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, - { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + { "parallel": 1, "config": { "ef_search": 128 } }, { "parallel": 1, "config": { "ef_search": 256 } }, { "parallel": 1, "config": { "ef_search": 512 } }, + { "parallel": 100, "config": { "ef_search": 128 } }, { "parallel": 100, "config": { "ef_search": 256 } }, { "parallel": 100, "config": { "ef_search": 512 } } ], "upload_params": { "parallel": 16 } } ]