Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add improvements for opensearch engine and updated the docker compose. #214

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 49 additions & 18 deletions engine/clients/opensearch/configure.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from opensearchpy import NotFoundError, OpenSearch
from opensearchpy import OpenSearch

from benchmark.dataset import Dataset
from engine.base_client import IncompatibilityError
from engine.base_client.configure import BaseConfigurator
from engine.base_client.distances import Distance
from engine.clients.opensearch.config import (
Expand All @@ -10,6 +9,7 @@
OPENSEARCH_PORT,
OPENSEARCH_USER,
)
from engine.clients.opensearch.utils import get_index_thread_qty


class OpenSearchConfigurator(BaseConfigurator):
Expand Down Expand Up @@ -40,28 +40,48 @@ def __init__(self, host, collection_params: dict, connection_params: dict):
)

def clean(self):
try:
is_index_available = self.client.indices.exists(
index=OPENSEARCH_INDEX,
params={
"timeout": 300,
},
)
if is_index_available:
print(f"Deleting index: {OPENSEARCH_INDEX}, as it is already present")
self.client.indices.delete(
index=OPENSEARCH_INDEX,
params={
"timeout": 300,
},
)
except NotFoundError:
pass

def recreate(self, dataset: Dataset, collection_params):
if dataset.config.distance == Distance.DOT:
raise IncompatibilityError
if dataset.config.vector_size > 1024:
raise IncompatibilityError
self._update_cluster_settings()
distance = self.DISTANCE_MAPPING[dataset.config.distance]
if dataset.config.distance == Distance.COSINE:
distance = self.DISTANCE_MAPPING[Distance.DOT]
print(
f"Using distance type: {distance} as dataset distance is : {dataset.config.distance}"
)

self.client.indices.create(
index=OPENSEARCH_INDEX,
body={
"settings": {
"index": {
"knn": True,
"refresh_interval": -1,
"number_of_replicas": (
0
if collection_params.get("number_of_replicas") == None
else collection_params.get("number_of_replicas")
),
"number_of_shards": (
1
if collection_params.get("number_of_shards") == None
else collection_params.get("number_of_shards")
),
"knn.advanced.approximate_threshold": "-1",
}
},
"mappings": {
Expand All @@ -72,18 +92,13 @@ def recreate(self, dataset: Dataset, collection_params):
"method": {
**{
"name": "hnsw",
"engine": "lucene",
"space_type": self.DISTANCE_MAPPING[
dataset.config.distance
],
"parameters": {
"m": 16,
"ef_construction": 100,
},
"engine": "faiss",
"space_type": distance,
**collection_params.get("method"),
},
**collection_params.get("method"),
},
},
# this doesn't work for nmslib, we need see what to do here, may be remove them
**self._prepare_fields_config(dataset),
}
},
Expand All @@ -94,6 +109,16 @@ def recreate(self, dataset: Dataset, collection_params):
cluster_manager_timeout="5m",
)

def _update_cluster_settings(self):
index_thread_qty = get_index_thread_qty(self.client)
cluster_settings_body = {
"persistent": {
"knn.memory.circuit_breaker.limit": "75%", # putting a higher value to ensure that even with small cluster the latencies for vector search are good
"knn.algo_param.index_thread_qty": index_thread_qty,
}
}
self.client.cluster.put_settings(cluster_settings_body)

def _prepare_fields_config(self, dataset: Dataset):
return {
field_name: {
Expand All @@ -104,3 +129,9 @@ def _prepare_fields_config(self, dataset: Dataset):
}
for field_name, field_type in dataset.config.schema.items()
}

def execution_params(self, distance, vector_size) -> dict:
# normalize the vectors if cosine similarity is there.
if distance == Distance.COSINE:
return {"normalize": "true"}
return {}
19 changes: 14 additions & 5 deletions engine/clients/opensearch/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def init_client(cls, host, distance, connection_params: dict, search_params: dic
basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD),
**init_params,
)
cls.distance = distance
cls.search_params = search_params

@classmethod
Expand All @@ -53,6 +54,11 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
"vector": {
"vector": query.vector,
"k": top,
"method_parameters": {
"ef_search": cls.search_params["config"][
"ef_search"
] # ef_search parameter is added in the query time
},
}
}
}
Expand All @@ -70,15 +76,18 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
params={
"timeout": 60,
},
_source=False,
docvalue_fields=["_id"],
stored_fields="_none_",
)

return [
(uuid.UUID(hex=hit["_id"]).int, hit["_score"])
(uuid.UUID(hex=hit["fields"]["_id"][0]).int, hit["_score"])
for hit in res["hits"]["hits"]
]

@classmethod
def setup_search(cls):
if cls.search_params:
cls.client.indices.put_settings(
body=cls.search_params["config"], index=OPENSEARCH_INDEX
)
# Load the graphs in memory
warmup_endpoint = f"/_plugins/_knn/warmup/{OPENSEARCH_INDEX}"
cls.client.transport.perform_request("GET", warmup_endpoint)
49 changes: 43 additions & 6 deletions engine/clients/opensearch/upload.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import multiprocessing as mp
import time
import uuid
from typing import List

Expand All @@ -12,6 +13,10 @@
OPENSEARCH_PORT,
OPENSEARCH_USER,
)
from engine.clients.opensearch.utils import (
get_index_thread_qty_for_force_merge,
update_force_merge_threads,
)


class ClosableOpenSearch(OpenSearch):
Expand Down Expand Up @@ -62,10 +67,42 @@ def upload_batch(cls, batch: List[Record]):

@classmethod
def post_upload(cls, _distance):
cls.client.indices.forcemerge(
index=OPENSEARCH_INDEX,
params={
"timeout": 300,
},
)
# ensuring that index is refreshed before force merge
cls._refresh_index()
cls._update_vector_threshold_setting()
cls._force_merge_index()
# ensuring that only force merged segments are remaining
cls._refresh_index()
return {}

@classmethod
def _refresh_index(cls):
print(f"Refreshing index: {OPENSEARCH_INDEX}")
params = {"timeout": 300}
cls.client.indices.refresh(index=OPENSEARCH_INDEX, params=params)

@classmethod
def _update_vector_threshold_setting(cls):
body = {
# ensure that approximate graph creation is enabled
"index.knn.advanced.approximate_threshold": "0"
}
cls.client.indices.put_settings(index=OPENSEARCH_INDEX, body=body)

@classmethod
def _force_merge_index(cls):
index_thread_qty = get_index_thread_qty_for_force_merge(cls.client)
update_force_merge_threads(client=cls.client, index_thread_qty=index_thread_qty)
force_merge_endpoint = f"/{OPENSEARCH_INDEX}/_forcemerge?max_num_segments=1&wait_for_completion=false"
force_merge_task_id = cls.client.transport.perform_request(
"POST", force_merge_endpoint
)["task"]
SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30
print(
f"Starting force merge on index: {OPENSEARCH_INDEX}, task_id: {force_merge_task_id}"
)
while True:
time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC)
task_status = cls.client.tasks.get(task_id=force_merge_task_id)
if task_status["completed"]:
break
85 changes: 85 additions & 0 deletions engine/clients/opensearch/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from opensearchpy import OpenSearch


def get_index_thread_qty_for_force_merge(client: OpenSearch):
processors_per_node = get_cores_for_data_nodes(client=client)
# since during force merge only 1 shard will be doing the merge we can be aggressive in parallelization factor
index_thread_qty = max(1, processors_per_node // 2)
print(f"Index thread qty for force merge: {index_thread_qty}")
return index_thread_qty


def get_index_thread_qty(client: OpenSearch):
processors_per_node = get_cores_for_data_nodes(client=client)
# since during index more than 1 shard will be doing indexing, we are becoming conservative in parallelization factor
index_thread_qty = max(1, processors_per_node // 8)
print(f"Index thread qty for indexing: {index_thread_qty}")
return index_thread_qty


def get_cores_for_data_nodes(client: OpenSearch):
# Sample nodes info response which is getting parsed.
# {
# "nodes": {
# "Or9Nm4UJR3-gcMOGwJhHHQ": {
# "roles": [
# "data",
# "ingest",
# "master",
# "remote_cluster_client"
# ],
# "os": {
# "refresh_interval_in_millis": 1000,
# "available_processors": 8,
# "allocated_processors": 8
# }
# },
# "A-cqbeekROeR3kzKhOXpRw": {
# "roles": [
# "data",
# "ingest",
# "master",
# "remote_cluster_client"
# ],
# "os": {
# "refresh_interval_in_millis": 1000,
# "available_processors": 8,
# "allocated_processors": 8
# }
# },
# "FrDs-vOMQ8yDZ0HEkDwRHA": {
# "roles": [
# "data",
# "ingest",
# "master",
# "remote_cluster_client"
# ],
# "os": {
# "refresh_interval_in_millis": 1000,
# "available_processors": 8,
# "allocated_processors": 8
# }
# }
# }
# }

nodes_stats_res = client.nodes.info(filter_path="nodes.*.roles,nodes.*.os")
nodes_data = nodes_stats_res.get("nodes")
data_node_count = 0
total_processors = 0
for node_id in nodes_data:
node_info = nodes_data.get(node_id)
roles = node_info["roles"]
os_info = node_info["os"]
if "data" in roles:
data_node_count += 1
total_processors += int(os_info["allocated_processors"])
processors_per_node = total_processors // data_node_count
return processors_per_node


def update_force_merge_threads(client: OpenSearch, index_thread_qty=1):
cluster_settings_body = {
"persistent": {"knn.algo_param.index_thread_qty": index_thread_qty}
}
client.cluster.put_settings(cluster_settings_body)
6 changes: 4 additions & 2 deletions engine/servers/opensearch-single-node-ci/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ version: '3.5'

services:
opensearch:
image: opensearchproject/opensearch:2.10.0
image: opensearchproject/opensearch:2.17.1
environment:
discovery.type: "single-node"
plugins.security.disabled: true
DISABLE_SECURITY_PLUGIN: true
DISABLE_INSTALL_DEMO_CONFIG: true
bootstrap.memory_lock: true
OPENSEARCH_JAVA_OPTS: "-Xms2g -Xmx2g"
ports:
- "9200:9200"
Expand Down
13 changes: 11 additions & 2 deletions engine/servers/opensearch-single-node/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,23 @@ version: '3.5'

services:
opensearch:
image: opensearchproject/opensearch:2.10.0
image: opensearchproject/opensearch:2.18.0
environment:
discovery.type: "single-node"
plugins.security.disabled: true
DISABLE_SECURITY_PLUGIN: true
DISABLE_INSTALL_DEMO_CONFIG: true
bootstrap.memory_lock: true
OPENSEARCH_JAVA_OPTS: "-Xms4g -Xmx4g"
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1 # Set memlock to unlimited (no soft or hard limit)
hard: -1
nofile:
soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536
hard: 65536
logging:
driver: "json-file"
options:
Expand Down
Loading