Skip to content

Commit

Permalink
search_model added to index settings (#633)
Browse files Browse the repository at this point in the history
add `search_model` feature with backwards compatibility for indexes
  • Loading branch information
vicilliar authored Nov 28, 2023
1 parent f96f953 commit 503625d
Show file tree
Hide file tree
Showing 20 changed files with 1,505 additions and 105 deletions.
2 changes: 1 addition & 1 deletion src/marqo/s2_inference/s2_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def vectorise(model_name: str, content: Union[str, List[str]], model_properties:
if not device:
raise InternalError(message=f"vectorise (internal function) cannot be called without setting device!")

validated_model_properties = _validate_model_properties(model_name, model_properties)
validated_model_properties = _validate_model_properties(model_name, model_properties) # This will be called on model_properties or search_model_properties, depending on what vectorise was called with.
model_cache_key = _create_model_cache_key(model_name, device, validated_model_properties)

_update_available_models(
Expand Down
11 changes: 9 additions & 2 deletions src/marqo/tensor_search/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,19 @@ def get_index_info(
raise errors.NonTensorIndexError(
f"Error retrieving index info for index {index_name}")

# Identify index `model` from mapping metadata
if "model" in res[index_name]["mappings"]["_meta"]:
model_name = res[index_name]["mappings"]["_meta"]["model"]
else:
raise errors.NonTensorIndexError(
"get_index_info: couldn't identify embedding model name "
F"in index mappings! Mapping: {res}")

# Identify index `search_model` from mapping metadata
if "search_model" in res[index_name]["mappings"]["_meta"]:
search_model_name = res[index_name]["mappings"]["_meta"]["search_model"]
else:
search_model_name = None # placeholder for backwards compatibility

if "index_settings" in res[index_name]["mappings"]["_meta"]:
index_settings = res[index_name]["mappings"]["_meta"]["index_settings"]
Expand All @@ -63,7 +70,7 @@ def get_index_info(

index_properties = res[index_name]["mappings"]["properties"]

index_info = IndexInfo(model_name=model_name, properties=index_properties,
index_info = IndexInfo(model_name=model_name, search_model_name=search_model_name, properties=index_properties,
index_settings=index_settings)
get_cache()[index_name] = index_info
return index_info
Expand All @@ -84,7 +91,6 @@ def add_customer_field_properties(config: Config, index_name: str,
customer_field_names: list of 2-tuples. The first elem in the tuple is
the new fieldnames the customers have made. The second elem is the
inferred OpenSearch data type.
model_properties: properties of the machine learning model
Returns:
HTTP Response
Expand Down Expand Up @@ -161,6 +167,7 @@ def add_customer_field_properties(config: Config, index_name: str,

get_cache()[index_name] = IndexInfo(
model_name=existing_info.model_name,
search_model_name=existing_info.search_model_name,
properties=new_index_properties,
index_settings=existing_info.index_settings.copy()
)
Expand Down
2 changes: 1 addition & 1 deletion src/marqo/tensor_search/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def get_default_index_settings():
NsFields.index_defaults: {
NsFields.treat_urls_and_pointers_as_images: False, # only used for models that have text and vision encoders
NsFields.model: ns_enums.MlModel.bert,
# NsFields.model_properties: dict(),
# search_model not here, as it is dynamically set based on model
NsFields.normalize_embeddings: True,
NsFields.text_preprocessing: {
NsFields.split_length: 2,
Expand Down
39 changes: 39 additions & 0 deletions src/marqo/tensor_search/create_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Functions used to fulfill the add_documents endpoint"""
import copy
from contextlib import contextmanager

import math
import threading
import random

from typing import List, Optional, Tuple, ContextManager, Union
import PIL
from PIL.ImageFile import ImageFile
from marqo.s2_inference import clip_utils
from marqo.tensor_search.telemetry import RequestMetricsStore, RequestMetrics
import marqo.errors as errors
from marqo.tensor_search import utils
from marqo.tensor_search.enums import IndexSettingsField
from marqo.tensor_search import constants


def autofill_search_model(index_settings: dict):
"""
Autofills `search_model` and `search_model_properties` in index settings if not provided.
This defaults to the same values as `model` and `model_properties` respectively.
`model` should always be set at this point, since this function runs after autofilling the other fields.
"""
new_index_settings = copy.deepcopy(index_settings)
if IndexSettingsField.search_model not in new_index_settings[IndexSettingsField.index_defaults]:
# set search_model to model
try:
new_index_settings[IndexSettingsField.index_defaults][IndexSettingsField.search_model] = new_index_settings[IndexSettingsField.index_defaults][IndexSettingsField.model]
except KeyError:
raise errors.InternalError("Index settings is missing `model` key. Nothing to set `search_model` to.")

# set search_model_properties to model_properties (if they exist)
if IndexSettingsField.model_properties in new_index_settings[IndexSettingsField.index_defaults]:
new_index_settings[IndexSettingsField.index_defaults][IndexSettingsField.search_model_properties] = new_index_settings[IndexSettingsField.index_defaults][IndexSettingsField.model_properties]

return new_index_settings
2 changes: 2 additions & 0 deletions src/marqo/tensor_search/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class IndexSettingsField:
treat_urls_and_pointers_as_images = "treat_urls_and_pointers_as_images"
model = "model"
model_properties = "model_properties"
search_model = "search_model"
search_model_properties = "search_model_properties"
normalize_embeddings = "normalize_embeddings"

text_preprocessing = "text_preprocessing"
Expand Down
43 changes: 32 additions & 11 deletions src/marqo/tensor_search/models/index_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@


# For use outside of this module
def get_model_properties_from_index_defaults(index_defaults: Dict, model_name: str):
""" Gets model_properties from index defaults if available. Otherwise, it attempts to get it from the model registry.
def get_model_properties_from_index_defaults(index_defaults: Dict, model_name: str, properties_key: str):
""" Gets model_properties or search_model_properties from index defaults if available. Otherwise, it attempts to get it from the model registry.
Args:
index_defaults: index_defaults from index_settings
model_name: name of the model search_model
properties_key: Either NsField.model_properties or NsField.search_model_properties
Returns:
dict of model properties. Could either represent model_properties or search_model_properties.
"""

try:
model_properties = index_defaults[NsField.model_properties]
model_properties = index_defaults[properties_key]
except KeyError:
model_properties = None

Expand All @@ -27,24 +35,25 @@ def get_model_properties_from_index_defaults(index_defaults: Dict, model_name: s
elif model_properties is None:
try:
model_properties = s2_inference.get_model_properties_from_registry(model_name)
except s2_inference_errors.UnknownModelError:
except s2_inference_errors.UnknownModelError as e:
raise errors.InvalidArgError(
f"Could not find model properties for model={model_name}. "
f"Please check that the model name is correct. "
f"Please provide model_properties if the model is a custom model and is not supported by default")

f"Could not find properties in registry for model: {model_name}. "
f"If this is a registry model, please check that the model name is correct. "
f"Please provide {properties_key} if the model is a custom model.")

return model_properties


class IndexInfo(NamedTuple):
"""
model_name: name of the ML model used to encode the data
model_name: name of the ML model used to encode the data (for add documents)
search_model_name: name of the ML model used to encode the data (for search)
properties: keys are different index field names, values
provide info about the properties
index_settings: settings for the index
"""
model_name: str
search_model_name: str
properties: dict
index_settings: dict

Expand Down Expand Up @@ -93,9 +102,21 @@ def get_text_properties(self) -> dict:
def get_model_properties(self) -> dict:
index_defaults = self.index_settings["index_defaults"]
return get_model_properties_from_index_defaults(
index_defaults=index_defaults, model_name=self.model_name
index_defaults=index_defaults, model_name=self.model_name, properties_key=NsField.model_properties
)

def get_search_model_properties(self) -> dict:
"""
Should not be called when search_model_name is None. tensor_search module should have called
get_model_properties instead.
"""
if self.search_model_name is None:
raise errors.InternalError("Cannot get `search_model_properties` when `search_model` does not exist.")

index_defaults = self.index_settings["index_defaults"]
return get_model_properties_from_index_defaults(
index_defaults=index_defaults, model_name=self.search_model_name, properties_key=NsField.search_model_properties
)


def get_true_text_properties(self) -> dict:
"""returns a dict containing only names and properties of fields that
Expand Down
9 changes: 9 additions & 0 deletions src/marqo/tensor_search/models/settings_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@
NsFields.model_properties: {
"type": "object",
},
NsFields.search_model: {
"type": "string",
"examples": [
"hf/all_datasets_v4_MiniLM-L6"
]
},
NsFields.search_model_properties: {
"type": "object",
},
NsFields.normalize_embeddings: {
"type": "boolean",
"examples": [
Expand Down
4 changes: 0 additions & 4 deletions src/marqo/tensor_search/on_start_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,6 @@ def _preload_model(model, content, device):
)
elif isinstance(model, dict):
# For models from URL
"""
TODO: include validation from on start script (model name properties etc)
_check_model_name(index_settings)
"""
try:
_ = vectorise(
model_name=model["model"],
Expand Down
81 changes: 57 additions & 24 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
EnvVars, MappingsObjectType, DocumentFieldType
)
from marqo.tensor_search.enums import IndexSettingsField as NsField
from marqo.tensor_search import utils, backend, validation, configs, add_docs, filtering
from marqo.tensor_search import utils, backend, validation, configs, add_docs, filtering, create_index
from marqo.tensor_search.formatting import _clean_doc
from marqo.tensor_search.index_meta_cache import get_cache, get_index_info
from marqo.tensor_search import index_meta_cache
Expand Down Expand Up @@ -92,7 +92,7 @@ def _get_dimension_from_model_properties(model_properties: dict) -> int:
return validation.validate_model_dimensions(model_properties["dimensions"])
except KeyError:
raise errors.InvalidArgError(
"The given model properties must contain a 'dimensions' key."
f"The given model properties must contain a 'dimensions' key. Received: {model_properties}"
)
except errors.InternalError as e:
# This is caused by bad `dimensions` validation.
Expand All @@ -101,24 +101,42 @@ def _get_dimension_from_model_properties(model_properties: dict) -> int:

def _add_knn_field(ix_settings: dict):
"""
This adds the OpenSearch knn field to the index's mappings
This adds the OpenSearch knn field to the index's mappings.
Adds dimensions based on the model properties. Ensures that dimensions of model properties match search model properties first.
Args:
ix_settings: the index settings
"""

ix_defaults = ix_settings["mappings"]["_meta"]["index_settings"][NsField.index_defaults]

# get model properties
model_prop = get_model_properties_from_index_defaults(
index_defaults=(
ix_settings["mappings"]["_meta"]
["index_settings"][NsField.index_defaults]),
model_name=(
ix_settings["mappings"]["_meta"]
["index_settings"][NsField.index_defaults][NsField.model])
index_defaults=ix_defaults,
model_name=ix_defaults[NsField.model],
properties_key=NsField.model_properties
)

# get search model properties
search_model_prop = get_model_properties_from_index_defaults(
index_defaults=ix_defaults,
model_name=ix_defaults[NsField.search_model],
properties_key=NsField.search_model_properties
)

# validate dimensions of model properties and search model properties match
model_dim = _get_dimension_from_model_properties(model_prop)
search_model_dim = _get_dimension_from_model_properties(search_model_prop)
if model_dim != search_model_dim:
raise errors.InvalidArgError(
f"Model properties dimensions ({model_dim}) and search model properties dimensions ({search_model_dim}) "
f"must be equal."
)

ix_settings_with_knn = ix_settings.copy()
ix_settings_with_knn["mappings"]["properties"][TensorField.chunks]["properties"][TensorField.marqo_knn_field] = {
"type": "knn_vector",
"dimension": _get_dimension_from_model_properties(model_prop),
"dimension": model_dim,
"method": (
ix_settings["mappings"]["_meta"]
["index_settings"][NsField.index_defaults][NsField.ann_parameters]
Expand All @@ -138,11 +156,14 @@ def create_vector_index(

if index_settings is not None:
if NsField.index_defaults in index_settings:
_check_model_name(index_settings)
validation.validate_model_name_and_properties(index_settings)
the_index_settings = _autofill_index_settings(index_settings=index_settings)
else:
the_index_settings = configs.get_default_index_settings()

# `search_model` is determined by `model` and `model_properties`
the_index_settings = create_index.autofill_search_model(the_index_settings)

validation.validate_settings_object(settings_object=the_index_settings)

vector_index_settings = {
Expand Down Expand Up @@ -192,31 +213,26 @@ def create_vector_index(
max_os_fields = _marqo_field_limit_to_os_limit(int(max_marqo_fields))
vector_index_settings["settings"]["mapping"] = {"total_fields": {"limit": int(max_os_fields)}}

# Add model and search model names to mappings metadata.
model_name = the_index_settings[NsField.index_defaults][NsField.model]
vector_index_settings["mappings"]["_meta"][NsField.index_settings] = the_index_settings
search_model_name = the_index_settings[NsField.index_defaults][NsField.search_model]
vector_index_settings["mappings"]["_meta"]["model"] = model_name
vector_index_settings["mappings"]["_meta"]["search_model"] = search_model_name

vector_index_settings["mappings"]["_meta"][NsField.index_settings] = the_index_settings

vector_index_settings_with_knn = _add_knn_field(ix_settings=vector_index_settings)

logger.debug(f"Creating index {index_name} with settings: {vector_index_settings_with_knn}")
response = HttpRequests(config).put(path=index_name, body=vector_index_settings_with_knn)

get_cache()[index_name] = IndexInfo(
model_name=model_name, properties=vector_index_settings_with_knn["mappings"]["properties"].copy(),
model_name=model_name, search_model_name=search_model_name, properties=vector_index_settings_with_knn["mappings"]["properties"].copy(),
index_settings=the_index_settings
)
return response


def _check_model_name(index_settings):
"""Ensures that if model_properties is given, then model_name is given as well
"""
model_name = index_settings[NsField.index_defaults].get(NsField.model)
model_properties = index_settings[NsField.index_defaults].get(NsField.model_properties)
if model_properties is not None and model_name is None:
raise s2_inference_errors.UnknownModelError(f"No model name found for model_properties={model_properties}")


def _marqo_field_limit_to_os_limit(marqo_index_field_limit: int) -> int:
"""Translates a Marqo Index Field limit (that a Marqo user will set)
into the equivalent limit for Marqo-OS
Expand Down Expand Up @@ -249,6 +265,7 @@ def _autofill_index_settings(index_settings: dict):

copied_settings = utils.merge_dicts(default_settings, copied_settings)

# Default to CLIP model if we are using treat_urls_and_pointers_as_images
if NsField.treat_urls_and_pointers_as_images in copied_settings[NsField.index_defaults] and \
copied_settings[NsField.index_defaults][NsField.treat_urls_and_pointers_as_images] is True \
and copied_settings[NsField.index_defaults][NsField.model] is None:
Expand Down Expand Up @@ -1454,6 +1471,19 @@ def gather_documents_from_response(resp: List[List[Dict[str, Any]]]) -> Dict[str
return gathered_docs


def determine_model_for_search_vectorisation(index_info: IndexInfo) -> Tuple[str, Dict[str, Any]]:
"""
Returns search_model_name and search_model_properties for vectorising search queries if they exist.
If they don't, returns the model_name and model_properties.
This is for backwards compatibility for indexes without search_model.
"""
if index_info.search_model_name is None:
return (index_info.model_name, index_info.get_model_properties())

logger.debug(f"Using search_model for vectorising search queries: {index_info.search_model_name}")
return (index_info.search_model_name, index_info.get_search_model_properties())


def assign_query_to_vector_job(
q: BulkSearchQueryEntity, jobs: Dict[JHash, VectorisedJobs], grouped_content: Tuple[List[str], List[str]],
index_info: IndexInfo, device: str) -> List[VectorisedJobPointer]:
Expand All @@ -1479,11 +1509,14 @@ def assign_query_to_vector_job(
"assign_query_to_vector_job() expects param `grouped_content` with 2 elems. Instead received"
f" `grouped_content` with {len(grouped_content)} elems")
ptrs = []

vectorise_model, vectorise_model_properties = determine_model_for_search_vectorisation(index_info) # Use search_model if it exists, otherwise use model

for i, grouped_content in enumerate(grouped_content):
content_type = 'text' if i == 0 else 'image'
vector_job = VectorisedJobs(
model_name=index_info.model_name,
model_properties=index_info.get_model_properties(),
model_name=vectorise_model,
model_properties=vectorise_model_properties,
content=grouped_content,
device=device,
normalize_embeddings=index_info.index_settings['index_defaults']['normalize_embeddings'],
Expand Down
Loading

0 comments on commit 503625d

Please sign in to comment.