Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Response to issue #59 #859

Open
wants to merge 5 commits into
base: mainline
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions marqo
Submodule marqo added at fff6cd
83 changes: 33 additions & 50 deletions src/marqo/tensor_search/on_start_script.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,24 @@
import json
import os
import time

import torch

from threading import Lock
from PIL import Image

from marqo import config, marqo_docs, version
from marqo.api import exceptions
from marqo.connections import redis_driver
from marqo.s2_inference.s2_inference import vectorise
from marqo.s2_inference.processing.image import chunk_image
from marqo.s2_inference.constants import PATCH_MODELS
# we need to import backend before index_meta_cache to prevent circular import error:
from marqo.tensor_search import constants
from marqo.tensor_search import index_meta_cache, utils
from marqo.tensor_search.enums import EnvVars
from marqo.tensor_search.tensor_search_logging import get_logger
from marqo import marqo_docs


from tabulate import tabulate

logger = get_logger(__name__)


def on_start(config: config.Config):
to_run_on_start = (
BootstrapVespa(config),
Expand Down Expand Up @@ -79,50 +73,57 @@ def run(self):


class CUDAAvailable:
"""checks the status of cuda
"""
logger = get_logger('CUDA device summary')
"""Checks the status of CUDA and logs a summary."""
logger = get_logger('CUDAAvailable')

def run(self):
def id_to_device(id):
if id < 0:
return ['cpu']
return [torch.cuda.get_device_name(id)]
def id_to_device(self, id):
if id < 0:
return 'CPU'
return torch.cuda.get_device_name(id)

def run(self):
device_count = 0 if not torch.cuda.is_available() else torch.cuda.device_count()
device_ids = [-1] + list(range(device_count))

# use -1 for cpu
device_ids = [-1]
device_ids += list(range(device_count))

device_names = []
device_summary = []
for device_id in device_ids:
device_names.append({'id': device_id, 'name': id_to_device(device_id)})
device_name = self.id_to_device(device_id)
model_test_status = self.test_model_on_device(device_id)
device_summary.append({'ID': device_id, 'Name': device_name, 'Model Test': model_test_status})

self.logger.info(f"Found devices {device_names}")
self.logger.info("Device and Model Summary:\n" + tabulate(device_summary, headers="keys", tablefmt="grid"))

def test_model_on_device(self, device_id):
try:
self.load_and_test_model(device_id)
return 'Success'
except Exception as e:
return f'Failure: {e}'

def load_and_test_model(self, device_id):
if device_id == -1:
pass # CPU test
else:
if not torch.cuda.is_available():
raise RuntimeError("CUDA not available.")
torch.cuda.set_device(device_id)
# Additional GPU model loading/testing logic


class SetBestAvailableDevice:
"""sets the MARQO_BEST_AVAILABLE_DEVICE env var
"""
"""Sets the MARQO_BEST_AVAILABLE_DEVICE env var"""
logger = get_logger('SetBestAvailableDevice')

def run(self):
"""
This is set once at startup time. We assume it will NOT change,
if it does, health check should throw a warning.
"""
if torch.cuda.is_available():
os.environ[EnvVars.MARQO_BEST_AVAILABLE_DEVICE] = "cuda"
else:
os.environ[EnvVars.MARQO_BEST_AVAILABLE_DEVICE] = "cpu"

self.logger.info(f"Best available device set to: {os.environ[EnvVars.MARQO_BEST_AVAILABLE_DEVICE]}")


class CacheModels:
"""warms the in-memory model cache by preloading good defaults
"""
"""Warms the in-memory model cache by preloading good defaults"""
logger = get_logger('ModelsForStartup')

def __init__(self):
Expand All @@ -133,7 +134,6 @@ def __init__(self):
try:
self.models = json.loads(warmed_models)
except json.JSONDecodeError as e:
# TODO: Change error message to match new format
raise exceptions.EnvVarError(
f"Could not parse environment variable `{EnvVars.MARQO_MODELS_TO_PRELOAD}`. "
f"Please ensure that this a JSON-encoded array of strings or dicts. For example:\n"
Expand All @@ -142,18 +142,15 @@ def __init__(self):
) from e
else:
self.models = warmed_models
# TBD to include cross-encoder/ms-marco-TinyBERT-L-2-v2

self.default_devices = ['cpu'] if not torch.cuda.is_available() else ['cpu', 'cuda']

self.logger.info(f"pre-loading {self.models} onto devices={self.default_devices}")

def run(self):
test_string = 'this is a test string'
N = 10
messages = []
for model in self.models:
# Skip preloading of models that can't be preloaded (eg. no_model)
if isinstance(model, str):
model_name = model
elif isinstance(model, dict):
Expand All @@ -174,7 +171,6 @@ def run(self):
for device in self.default_devices:
self.logger.debug(f"Loading model: {model} on device: {device}")

# warm it up
_ = _preload_model(model=model, content=test_string, device=device)

t = 0
Expand All @@ -192,6 +188,7 @@ def run(self):
self.logger.info(message)
self.logger.info("completed loading models")


class CachePatchModels:
"""Prewarm patch models"""

Expand Down Expand Up @@ -235,7 +232,6 @@ def run(self):
self.logger.debug(f"Prewarming model: {model} on device: {device}")
with self.lock:
try:
# Warm it up
chunks = chunk_image(test_image_pil, device=device, method=model)

t = 0
Expand All @@ -260,25 +256,13 @@ def run(self):


def _preload_model(model, content, device):
"""
Calls vectorise for a model once. This will load in the model if it isn't already loaded.
If `model` is a str, it should be a model name in the registry
If `model is a dict, it should be an object containing `model_name` and `model_properties`
Model properties will be passed to vectorise call if object exists
"""
if isinstance(model, str):
# For models IN REGISTRY
_ = vectorise(
model_name=model,
content=content,
device=device
)
elif isinstance(model, dict):
# For models from URL
"""
TODO: include validation from on start script (model name properties etc)
_check_model_name(index_settings)
"""
try:
_ = vectorise(
model_name=model["model"],
Expand All @@ -301,7 +285,6 @@ def __init__(self, host: str, port: int):

def run(self):
logger.debug('Initializing Redis')
# Can be turned off with MARQO_ENABLE_THROTTLING = 'FALSE'
if utils.read_env_vars_and_defaults(EnvVars.MARQO_ENABLE_THROTTLING) == "TRUE":
redis_driver.init_from_app(self.host, self.port)

Expand Down
116 changes: 84 additions & 32 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import numpy as np
import psutil
from PIL import Image
from opensearchpy.exceptions import RequestError
from marqo.errors import BadRequestError, InternalError, InvalidArgError

import marqo.core.unstructured_vespa_index.common as unstructured_common
from marqo import marqo_docs
Expand Down Expand Up @@ -111,6 +113,13 @@ def add_documents(config: Config, add_docs_params: AddDocsParams):
raise api_exceptions.InternalError(f"Unknown index type {type(marqo_index)}")









def _add_documents_unstructured(config: Config, add_docs_params: AddDocsParams, marqo_index: UnstructuredMarqoIndex):
# ADD DOCS TIMER-LOGGER (3)
vespa_client = config.vespa_client
Expand Down Expand Up @@ -142,14 +151,14 @@ def _add_documents_unstructured(config: Config, add_docs_params: AddDocsParams,
text_chunk_prefix = marqo_index.model.get_text_chunk_prefix(add_docs_params.text_chunk_prefix)

docs, doc_ids = config.document.remove_duplicated_documents(add_docs_params.docs)

with ExitStack() as exit_stack:
if marqo_index.treat_urls_and_pointers_as_images:
with RequestMetricsStore.for_request().time(
try:
with ExitStack() as exit_stack:
if marqo_index.treat_urls_and_pointers_as_images:
with RequestMetricsStore.for_request().time(
"image_download.full_time",
lambda t: logger.debug(
f"add_documents image download: took {t:.3f}ms to concurrently download "
f"images for {batch_size} docs using {add_docs_params.image_download_thread_count} threads"
f"add_documents image download: took {t:.3f}ms to concurrently download "
f"images for {batch_size} docs using {add_docs_params.image_download_thread_count} threads"
)
):
# TODO - Refactor this part to make it more readable
Expand Down Expand Up @@ -479,38 +488,65 @@ def _add_documents_unstructured(config: Config, add_docs_params: AddDocsParams,
copied[constants.MARQO_DOC_ID] = doc_id
bulk_parent_dicts.append(copied)

total_preproc_time = 0.001 * RequestMetricsStore.for_request().stop(
"add_documents.processing_before_vespa")
logger.debug(
f" add_documents pre-processing: took {(total_preproc_time):.3f}s total for {batch_size} docs, "
f"for an average of {(total_preproc_time / batch_size):.3f}s per doc.")
total_preproc_time = 0.001 * RequestMetricsStore.for_request().stop(
"add_documents.processing_before_vespa")
logger.debug(
f" add_documents pre-processing: took {(total_preproc_time):.3f}s total for {batch_size} docs, "
f"for an average of {(total_preproc_time / batch_size):.3f}s per doc.")

logger.debug(f" add_documents vectorise: took {(total_vectorise_time):.3f}s for {batch_size} docs, "
logger.debug(f" add_documents vectorise: took {(total_vectorise_time):.3f}s for {batch_size} docs, "
f"for an average of {(total_vectorise_time / batch_size):.3f}s per doc.")

if bulk_parent_dicts:
vespa_docs = [
VespaDocument(**unstructured_vespa_index.to_vespa_document(marqo_document=doc))
if bulk_parent_dicts:
vespa_docs = [
VespaDocument(**unstructured_vespa_index.to_vespa_document(marqo_document=doc))
for doc in bulk_parent_dicts
]
# ADD DOCS TIMER-LOGGER (5)
start_time_5 = timer()
with RequestMetricsStore.for_request().time("add_documents.vespa._bulk"):
index_responses = vespa_client.feed_batch(vespa_docs, marqo_index.schema_name)
# ADD DOCS TIMER-LOGGER (5)
start_time_5 = timer()
with RequestMetricsStore.for_request().time("add_documents.vespa._bulk"):
index_responses = vespa_client.feed_batch(vespa_docs, marqo_index.schema_name)

end_time_5 = timer()
total_http_time = end_time_5 - start_time_5
logger.debug(
f" add_documents roundtrip: took {(total_http_time):.3f}s to send {batch_size} "
f"docs (roundtrip) to vector store, "
f"for an average of {(total_http_time / batch_size):.3f}s per doc.")
else:
index_responses = None
end_time_5 = timer()
total_http_time = end_time_5 - start_time_5
logger.debug(
f" add_documents roundtrip: took {(total_http_time):.3f}s to send {batch_size} "
f"docs (roundtrip) to vector store, "
f"for an average of {(total_http_time / batch_size):.3f}s per doc.")
else:
index_responses = None

with RequestMetricsStore.for_request().time("add_documents.postprocess"):
t1 = timer()
with RequestMetricsStore.for_request().time("add_documents.postprocess"):
t1 = timer()
except RequestError as e:
if e.status_code == 400:
raise BadRequestError("OpenSearch returned a 400 error.") from e
else:
raise

def translate_add_doc_response(responses: Optional[FeedBatchResponse], time_diff: float) -> dict:
if len(unsuccessful_docs) > 0:
raise BadRequestError(
f"{len(unsuccessful_docs)} document(s) failed to be added. First error: "
f"{unsuccessful_docs[0][1]['error']}"
)

try:
if len(bulk_parent_dicts) > 0:
logger.info(f"Adding {len(bulk_parent_dicts)} documents")
unstructured_index = UnstructuredVespaIndex(marqo_index)
vespa_client.add_documents(unstructured_index, bulk_parent_dicts)

logger.info("Documents added successfully")

if marqo_index.is_refreshing and marqo_index.refresh_interval is not None:
vespa_client.refresh_index(unstructured_index)
except RequestError as e:
if e.status_code == 400:
raise BadRequestError("OpenSearch returned a 400 error during document addition.") from e
else:
raise

def translate_add_doc_response(responses: Optional[FeedBatchResponse], time_diff: float) -> dict:
"""translates Vespa response dict into Marqo dict"""
result_dict = {}
new_items = []
Expand Down Expand Up @@ -538,7 +574,23 @@ def translate_add_doc_response(responses: Optional[FeedBatchResponse], time_diff

return result_dict

return translate_add_doc_response(index_responses, time_diff=t1 - t0)
return translate_add_doc_response(index_responses, time_diff=t1 - t0)


















def _add_documents_structured(config: Config, add_docs_params: AddDocsParams, marqo_index: StructuredMarqoIndex):
Expand Down Expand Up @@ -2252,4 +2304,4 @@ def delete_documents(config: Config, index_name: str, doc_ids: List[str]):
schema_name=marqo_index.schema_name,
document_ids=doc_ids,
)
)
)