diff --git a/benchmark/dataset.py b/benchmark/dataset.py index ef006955..96c78fab 100644 --- a/benchmark/dataset.py +++ b/benchmark/dataset.py @@ -34,6 +34,12 @@ class DatasetConfig: } +# prepare progressbar +def show_progress(block_num, block_size, total_size): + percent = round(block_num * block_size / total_size * 100, 2) + print(f"{percent} %", end="\r") + + class Dataset: def __init__(self, config: dict): self.config = DatasetConfig(**config) @@ -47,7 +53,9 @@ def download(self): if self.config.link: print(f"Downloading {self.config.link}...") - tmp_path, _ = urllib.request.urlretrieve(self.config.link) + tmp_path, _ = urllib.request.urlretrieve( + self.config.link, None, show_progress + ) if self.config.link.endswith(".tgz") or self.config.link.endswith( ".tar.gz" diff --git a/engine/base_client/client.py b/engine/base_client/client.py index def2f53b..8e30a2ee 100644 --- a/engine/base_client/client.py +++ b/engine/base_client/client.py @@ -84,6 +84,7 @@ def run_experiment( skip_upload: bool = False, skip_search: bool = False, skip_if_exists: bool = True, + parallels: [int] = [], ): execution_params = self.configurator.execution_params( distance=dataset.config.distance, vector_size=dataset.config.vector_size @@ -125,7 +126,6 @@ def run_experiment( if not skip_search: print("Experiment stage: Search") for search_id, searcher in enumerate(self.searchers): - if skip_if_exists: glob_pattern = ( f"{self.name}-{dataset.config.name}-search-{search_id}-*.json" @@ -139,9 +139,19 @@ def run_experiment( continue search_params = {**searcher.search_params} + ef = search_params.get("search_params", {}).get("ef", "default") + client_count = search_params.get("parallel", 1) + filter_client_count = len(parallels) > 0 + if filter_client_count and (client_count not in parallels): + print(f"\tSkipping ef runtime: {ef}; #clients {client_count}") + continue + print(f"\tRunning ef runtime: {ef}; #clients {client_count}") + search_stats = searcher.search_all( dataset.config.distance, reader.read_queries() ) + # ensure we specify the client count in the results + search_params["parallel"] = client_count if not DETAILED_RESULTS: # Remove verbose stats from search results search_stats.pop("latencies", None) @@ -150,6 +160,7 @@ def run_experiment( self.save_search_results( dataset.config.name, search_stats, search_id, search_params ) + print("Experiment stage: Done") print("Results saved to: ", RESULTS_DIR) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index 3626191e..7645f836 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -50,7 +50,6 @@ def _search_one(cls, query: Query, top: Optional[int] = None): if query.expected_result: ids = set(x[0] for x in search_res) precision = len(ids.intersection(query.expected_result[:top])) / top - return precision, end - start def search_all( @@ -58,9 +57,8 @@ def search_all( distance, queries: Iterable[Query], ): - parallel = self.search_params.get("parallel", 1) - top = self.search_params.get("top", None) - + parallel = self.search_params.pop("parallel", 1) + top = self.search_params.pop("top", None) # setup_search may require initialized client self.init_client( self.host, distance, self.connection_params, self.search_params @@ -106,6 +104,7 @@ def search_all( "min_time": np.min(latencies), "max_time": np.max(latencies), "rps": len(latencies) / total_time, + "p50_time": np.percentile(latencies, 50), "p95_time": np.percentile(latencies, 95), "p99_time": np.percentile(latencies, 99), "precisions": precisions, diff --git a/engine/base_client/upload.py b/engine/base_client/upload.py index 55ee4055..ade19fff 100644 --- a/engine/base_client/upload.py +++ b/engine/base_client/upload.py @@ -53,12 +53,15 @@ def upload( self.upload_params, ), ) as pool: - latencies = list( - pool.imap( - self.__class__._upload_batch, - iter_batches(tqdm.tqdm(records), batch_size), + try: + latencies = list( + pool.imap( + self.__class__._upload_batch, + iter_batches(tqdm.tqdm(records), batch_size), + ) ) - ) + except Exception as e: + raise e upload_time = time.perf_counter() - start @@ -77,6 +80,8 @@ def upload( "upload_time": upload_time, "total_time": total_time, "latencies": latencies, + "parallel": parallel, + "batch_size": batch_size, } @classmethod