Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Nov 6, 2024
1 parent 7acee94 commit 5d74fcd
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 93 deletions.
25 changes: 14 additions & 11 deletions engine/clients/opensearch/configure.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from opensearchpy import NotFoundError, OpenSearch
from opensearchpy import OpenSearch

from benchmark.dataset import Dataset
from engine.base_client.configure import BaseConfigurator
Expand Down Expand Up @@ -40,26 +40,29 @@ def __init__(self, host, collection_params: dict, connection_params: dict):
)

def clean(self):
is_index_available = self.client.indices.exists(index=OPENSEARCH_INDEX,
is_index_available = self.client.indices.exists(
index=OPENSEARCH_INDEX,
params={
"timeout": 300,
})
if(is_index_available):
},
)
if is_index_available:
print(f"Deleting index: {OPENSEARCH_INDEX}, as it is already present")
self.client.indices.delete(
index=OPENSEARCH_INDEX,
params={
"timeout": 300,
},
)


def recreate(self, dataset: Dataset, collection_params):
self._update_cluster_settings()
distance = self.DISTANCE_MAPPING[dataset.config.distance]
if dataset.config.distance == Distance.COSINE:
distance = self.DISTANCE_MAPPING[Distance.DOT]
print(f"Using distance type: {distance} as dataset distance is : {dataset.config.distance}")
print(
f"Using distance type: {distance} as dataset distance is : {dataset.config.distance}"
)

self.client.indices.create(
index=OPENSEARCH_INDEX,
Expand All @@ -70,7 +73,7 @@ def recreate(self, dataset: Dataset, collection_params):
"refresh_interval": -1,
"number_of_replicas": 0,
"number_of_shards": 1,
"knn.advanced.approximate_threshold": "-1"
"knn.advanced.approximate_threshold": "-1",
}
},
"mappings": {
Expand All @@ -83,7 +86,7 @@ def recreate(self, dataset: Dataset, collection_params):
"name": "hnsw",
"engine": "faiss",
"space_type": distance,
**collection_params.get("method")
**collection_params.get("method"),
},
},
},
Expand All @@ -102,8 +105,8 @@ def _update_cluster_settings(self):
index_thread_qty = get_index_thread_qty(self.client)
cluster_settings_body = {
"persistent": {
"knn.memory.circuit_breaker.limit": "75%", # putting a higher value to ensure that even with small cluster the latencies for vector search are good
"knn.algo_param.index_thread_qty": index_thread_qty
"knn.memory.circuit_breaker.limit": "75%", # putting a higher value to ensure that even with small cluster the latencies for vector search are good
"knn.algo_param.index_thread_qty": index_thread_qty,
}
}
self.client.cluster.put_settings(cluster_settings_body)
Expand All @@ -118,7 +121,7 @@ def _prepare_fields_config(self, dataset: Dataset):
}
for field_name, field_type in dataset.config.schema.items()
}

def execution_params(self, distance, vector_size) -> dict:
# normalize the vectors if cosine similarity is there.
if distance == Distance.COSINE:
Expand Down
16 changes: 8 additions & 8 deletions engine/clients/opensearch/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from opensearchpy import OpenSearch

from dataset_reader.base_reader import Query
from engine.base_client.distances import Distance
from engine.base_client.search import BaseSearcher
from engine.clients.opensearch.config import (
OPENSEARCH_INDEX,
Expand All @@ -14,7 +13,6 @@
OPENSEARCH_USER,
)
from engine.clients.opensearch.parser import OpenSearchConditionParser
import numpy as np


class ClosableOpenSearch(OpenSearch):
Expand Down Expand Up @@ -56,9 +54,11 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
"vector": {
"vector": query.vector,
"k": top,
"method_parameters" : {
"ef_search": cls.search_params["config"]["ef_search"] # ef_search parameter is added in the query time
}
"method_parameters": {
"ef_search": cls.search_params["config"][
"ef_search"
] # ef_search parameter is added in the query time
},
}
}
}
Expand All @@ -80,7 +80,7 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
docvalue_fields=["_id"],
stored_fields="_none_",
)

return [
(uuid.UUID(hex=hit["fields"]["_id"][0]).int, hit["_score"])
for hit in res["hits"]["hits"]
Expand All @@ -89,5 +89,5 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
@classmethod
def setup_search(cls):
# Load the graphs in memory
warmup_endpoint = f'/_plugins/_knn/warmup/{OPENSEARCH_INDEX}'
cls.client.transport.perform_request('GET', warmup_endpoint)
warmup_endpoint = f"/_plugins/_knn/warmup/{OPENSEARCH_INDEX}"
cls.client.transport.perform_request("GET", warmup_endpoint)
28 changes: 17 additions & 11 deletions engine/clients/opensearch/upload.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import multiprocessing as mp
import uuid
import time
import uuid
from typing import List

from opensearchpy import OpenSearch

from dataset_reader.base_reader import Record
from engine.base_client.distances import Distance
from engine.base_client.upload import BaseUploader
from engine.clients.opensearch.config import (
OPENSEARCH_INDEX,
OPENSEARCH_PASSWORD,
OPENSEARCH_PORT,
OPENSEARCH_USER,
)
from engine.clients.opensearch.utils import get_index_thread_qty_for_force_merge, update_force_merge_threads
from engine.clients.opensearch.utils import (
get_index_thread_qty_for_force_merge,
update_force_merge_threads,
)


class ClosableOpenSearch(OpenSearch):
Expand Down Expand Up @@ -76,9 +78,9 @@ def post_upload(cls, _distance):
@classmethod
def _refresh_index(cls):
print(f"Refreshing index: {OPENSEARCH_INDEX}")
params={"timeout": 300}
params = {"timeout": 300}
cls.client.indices.refresh(index=OPENSEARCH_INDEX, params=params)

@classmethod
def _update_vector_threshold_setting(cls):
body = {
Expand All @@ -90,13 +92,17 @@ def _update_vector_threshold_setting(cls):
@classmethod
def _force_merge_index(cls):
index_thread_qty = get_index_thread_qty_for_force_merge(cls.client)
update_force_merge_threads(client = cls.client, index_thread_qty = index_thread_qty)
force_merge_endpoint = f'/{OPENSEARCH_INDEX}/_forcemerge?max_num_segments=1&wait_for_completion=false'
force_merge_task_id = cls.client.transport.perform_request('POST', force_merge_endpoint)['task']
update_force_merge_threads(client=cls.client, index_thread_qty=index_thread_qty)
force_merge_endpoint = f"/{OPENSEARCH_INDEX}/_forcemerge?max_num_segments=1&wait_for_completion=false"
force_merge_task_id = cls.client.transport.perform_request(
"POST", force_merge_endpoint
)["task"]
SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30
print(f"Starting force merge on index: {OPENSEARCH_INDEX}, task_id: {force_merge_task_id}")
print(
f"Starting force merge on index: {OPENSEARCH_INDEX}, task_id: {force_merge_task_id}"
)
while True:
time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC)
task_status = cls.client.tasks.get(task_id=force_merge_task_id)
if task_status['completed']:
break
if task_status["completed"]:
break
126 changes: 63 additions & 63 deletions engine/clients/opensearch/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from opensearchpy import OpenSearch


def get_index_thread_qty_for_force_merge(client: OpenSearch):
processors_per_node = get_cores_for_data_nodes(client=client)
# since during force merge only 1 shard will be doing the merge we can be aggressive in parallelization factor
index_thread_qty = max(1, processors_per_node // 2)
print(f"Index thread qty for force merge: {index_thread_qty}")
return index_thread_qty


def get_index_thread_qty(client: OpenSearch):
processors_per_node = get_cores_for_data_nodes(client=client)
# since during index more than 1 shard will be doing indexing, we are becoming conservative in parallelization factor
Expand All @@ -16,70 +18,68 @@ def get_index_thread_qty(client: OpenSearch):


def get_cores_for_data_nodes(client: OpenSearch):
# Sample nodes info response which is getting parsed.
# {
# "nodes": {
# "Or9Nm4UJR3-gcMOGwJhHHQ": {
# "roles": [
# "data",
# "ingest",
# "master",
# "remote_cluster_client"
# ],
# "os": {
# "refresh_interval_in_millis": 1000,
# "available_processors": 8,
# "allocated_processors": 8
# }
# },
# "A-cqbeekROeR3kzKhOXpRw": {
# "roles": [
# "data",
# "ingest",
# "master",
# "remote_cluster_client"
# ],
# "os": {
# "refresh_interval_in_millis": 1000,
# "available_processors": 8,
# "allocated_processors": 8
# }
# },
# "FrDs-vOMQ8yDZ0HEkDwRHA": {
# "roles": [
# "data",
# "ingest",
# "master",
# "remote_cluster_client"
# ],
# "os": {
# "refresh_interval_in_millis": 1000,
# "available_processors": 8,
# "allocated_processors": 8
# }
# }
# }
# }
# Sample nodes info response which is getting parsed.
# {
# "nodes": {
# "Or9Nm4UJR3-gcMOGwJhHHQ": {
# "roles": [
# "data",
# "ingest",
# "master",
# "remote_cluster_client"
# ],
# "os": {
# "refresh_interval_in_millis": 1000,
# "available_processors": 8,
# "allocated_processors": 8
# }
# },
# "A-cqbeekROeR3kzKhOXpRw": {
# "roles": [
# "data",
# "ingest",
# "master",
# "remote_cluster_client"
# ],
# "os": {
# "refresh_interval_in_millis": 1000,
# "available_processors": 8,
# "allocated_processors": 8
# }
# },
# "FrDs-vOMQ8yDZ0HEkDwRHA": {
# "roles": [
# "data",
# "ingest",
# "master",
# "remote_cluster_client"
# ],
# "os": {
# "refresh_interval_in_millis": 1000,
# "available_processors": 8,
# "allocated_processors": 8
# }
# }
# }
# }

nodes_stats_res = client.nodes.info(filter_path="nodes.*.roles,nodes.*.os")
nodes_data = nodes_stats_res.get("nodes")
data_node_count = 0
total_processors = 0
for node_id in nodes_data:
node_info = nodes_data.get(node_id)
roles = node_info["roles"]
os_info = node_info["os"]
if 'data' in roles:
data_node_count += 1
total_processors += int(os_info['allocated_processors'])
processors_per_node = total_processors // data_node_count
return processors_per_node
nodes_stats_res = client.nodes.info(filter_path="nodes.*.roles,nodes.*.os")
nodes_data = nodes_stats_res.get("nodes")
data_node_count = 0
total_processors = 0
for node_id in nodes_data:
node_info = nodes_data.get(node_id)
roles = node_info["roles"]
os_info = node_info["os"]
if "data" in roles:
data_node_count += 1
total_processors += int(os_info["allocated_processors"])
processors_per_node = total_processors // data_node_count
return processors_per_node


def update_force_merge_threads(client: OpenSearch, index_thread_qty=1):
cluster_settings_body = {
"persistent": {
"knn.algo_param.index_thread_qty": index_thread_qty
}
}
client.cluster.put_settings(cluster_settings_body)
cluster_settings_body = {
"persistent": {"knn.algo_param.index_thread_qty": index_thread_qty}
}
client.cluster.put_settings(cluster_settings_body)

0 comments on commit 5d74fcd

Please sign in to comment.