From 2815295fb73bc96def4e1450000b2c2e5624b785 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Fri, 20 Sep 2024 15:59:56 +0200 Subject: [PATCH 01/14] first retry version --- modyn/evaluator/internal/pytorch_evaluator.py | 1 - .../pipeline_executor/evaluation_executor.py | 150 +++++++++++------- .../internal/test_pytorch_evaluator.py | 2 - 3 files changed, 89 insertions(+), 64 deletions(-) diff --git a/modyn/evaluator/internal/pytorch_evaluator.py b/modyn/evaluator/internal/pytorch_evaluator.py index 1abe14c80..b3ad49ce8 100644 --- a/modyn/evaluator/internal/pytorch_evaluator.py +++ b/modyn/evaluator/internal/pytorch_evaluator.py @@ -39,7 +39,6 @@ def __init__( ) self._device = evaluation_info.device - self._device_type = "cuda" if "cuda" in self._device else "cpu" self._amp = evaluation_info.amp self._info("Initialized PyTorch evaluator.") diff --git a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py index d4592999a..cb4bfd4c3 100644 --- a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py +++ b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py @@ -339,80 +339,108 @@ def _single_batched_evaluation( self.pipeline.evaluation.device, intervals=cast(list[tuple[int | None, int | None]], intervals), ) + + def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str: + return EvaluationAbortedReason.DESCRIPTOR.values_by_number[eval_aborted_reason].name + for attempt in Retrying( - stop=stop_after_attempt(5), - wait=wait_random_exponential(multiplier=1, min=2, max=60), + stop=stop_after_attempt(10), + wait=wait_random_exponential(multiplier=1, min=2, max=120), reraise=True, ): with attempt: try: response: EvaluateModelResponse = self.grpc.evaluator.evaluate_model(request) - except grpc.RpcError as e: # We catch and reraise to reconnect + except grpc.RpcError as e: # We catch and reraise them to tenacity after reconnecting logger.error(e) logger.error("gRPC connection error, trying to reconnect...") self.grpc.init_evaluator() raise e - assert len(response.interval_responses) == len( - intervals - ), f"We expected {len(intervals)} intervals, but got {len(response.interval_responses)}." - - def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str: - return EvaluationAbortedReason.DESCRIPTOR.values_by_number[eval_aborted_reason].name - - if not response.evaluation_started: - failure_reasons: list[tuple[str | None, dict]] = [] - # note: interval indexes correspond to the intervals in the request - for interval_idx, interval_response in enumerate(response.interval_responses): - if interval_response.eval_aborted_reason != EvaluationAbortedReason.NOT_ABORTED: - reason = get_failure_reason(interval_response.eval_aborted_reason) - failure_reasons.append((reason, {})) - logger.error( - f"Evaluation for model {model_id_to_eval} on split {intervals[interval_idx]} " - f"not started with reason: {reason}." - ) - return failure_reasons - - logger.info(f"Evaluation started for model {model_id_to_eval} on intervals {intervals}.") - self.grpc.wait_for_evaluation_completion(response.evaluation_id) - eval_data = self.grpc.get_evaluation_results(response.evaluation_id) - self.grpc.cleanup_evaluations([response.evaluation_id]) - - eval_results: list[tuple[str | None, dict[str, Any]]] = [] - - # ---------------------------------------------- Result Builder ---------------------------------------------- # - # The `eval_results` list is a list of tuples. Each tuple contains a failure reason (if any) and a dictionary - # with the evaluation results. The order of the tuples corresponds to the order of the intervals. - # - # response.interval_responses contains the evaluation results for each interval in the same order as the - # intervals in the request. Failed evaluations are marked with a failure reason. - - # Metric results come from the `EvaluateModelResponse` and are stored in the `evaluation_data` field. This - # only contains the metrics for the intervals that were successfully evaluated. - # - # Therefore we first build a list of results with the same order as the intervals. The metrics will be filled in - # the next loop that unwraps `EvaluationResultResponse`. - # ----------------------------------------------------- . ---------------------------------------------------- # - - for interval_response in response.interval_responses: - if interval_response.eval_aborted_reason != EvaluationAbortedReason.NOT_ABORTED: - reason = get_failure_reason(interval_response.eval_aborted_reason) - eval_results.append((reason, {})) - else: - eval_results.append( - ( - None, - {"dataset_size": interval_response.dataset_size, "metrics": []}, + assert len(response.interval_responses) == len( + intervals + ), f"We expected {len(intervals)} intervals, but got {len(response.interval_responses)}." + + if not response.evaluation_started: + failure_reasons: list[tuple[str | None, dict]] = [] + # note: interval indexes correspond to the intervals in the request + for interval_idx, interval_response in enumerate(response.interval_responses): + if interval_response.eval_aborted_reason != EvaluationAbortedReason.NOT_ABORTED: + reason = get_failure_reason(interval_response.eval_aborted_reason) + failure_reasons.append((reason, {})) + logger.error( + f"Evaluation for model {model_id_to_eval} on split {intervals[interval_idx]} " + f"not started with reason: {reason}." + ) + # No retrying here, if we were to retry it should be done at the evaluator + # This is likely due to an invalid request in the first place. + return failure_reasons + + logger.info(f"Evaluation started for model {model_id_to_eval} on intervals {intervals}.") + self.grpc.wait_for_evaluation_completion(response.evaluation_id) + eval_data = self.grpc.get_evaluation_results(response.evaluation_id) + self.grpc.cleanup_evaluations([response.evaluation_id]) + + eval_results: list[tuple[str | None, dict[str, Any]]] = [] + + # ---------------------------------------------- Result Builder ---------------------------------------------- # + # The `eval_results` list is a list of tuples. Each tuple contains a failure reason (if any) and a dictionary + # with the evaluation results. The order of the tuples corresponds to the order of the intervals. + # + # response.interval_responses contains the evaluation results for each interval in the same order as the + # intervals in the request. Failed evaluations are marked with a failure reason. + + # Metric results come from the `EvaluateModelResponse` and are stored in the `evaluation_data` field. This + # only contains the metrics for the intervals that were successfully evaluated. + # + # Therefore we first build a list of results with the same order as the intervals. The metrics will be filled in + # the next loop that unwraps `EvaluationResultResponse`. + # ----------------------------------------------------- . ---------------------------------------------------- # + + for interval_response in response.interval_responses: + if interval_response.eval_aborted_reason != EvaluationAbortedReason.NOT_ABORTED: + reason = get_failure_reason(interval_response.eval_aborted_reason) + eval_results.append((reason, {})) + else: + eval_results.append( + ( + None, + {"dataset_size": interval_response.dataset_size, "metrics": []}, + ) + ) + + for interval_result in eval_data: + interval_idx = interval_result.interval_index + assert ( + eval_results[interval_idx][0] is None + ), "Evaluation failed, this interval idx should not be returned by evaluator." + eval_results[interval_idx][1]["metrics"] = [ + {"name": metric.metric, "result": metric.result} for metric in interval_result.evaluation_data + ] + + # Assert that all evaluated intervals have all metrics + # Will trigger a retry in case this is not successful + # Can happen, e.g., if the evaluator is overloaded + expected_num_metrics = len(request.metrics) + for abort_reason, data_dict in eval_results: + if abort_reason is not None: + # If there was any reason to abort, we don't care + continue + + assert ( + data_dict["dataset_size"] > 0 + ), f"dataset size of 0, but no EMPTY_INTERVAL response: {eval_results}" + actual_num_metrics = len(data_dict["metrics"]) + assert actual_num_metrics == expected_num_metrics, ( + f"actual_num_metrics = {actual_num_metrics}" + + f" != expected_num_metrics = {expected_num_metrics}" + + "\n" + + str(eval_results) ) - ) - for interval_result in eval_data: - interval_idx = interval_result.interval_index - assert eval_results[interval_idx][0] is None, "Evaluation failed, no metrics should be present." - eval_results[interval_idx][1]["metrics"] = [ - {"name": metric.metric, "result": metric.result} for metric in interval_result.evaluation_data - ] - return eval_results + return eval_results + + raise RuntimeError("Unreachable code - satisfy mypy.") # ------------------------------------------------------------------------------------ # diff --git a/modyn/tests/evaluator/internal/test_pytorch_evaluator.py b/modyn/tests/evaluator/internal/test_pytorch_evaluator.py index 211137d0f..11d60adfb 100644 --- a/modyn/tests/evaluator/internal/test_pytorch_evaluator.py +++ b/modyn/tests/evaluator/internal/test_pytorch_evaluator.py @@ -170,7 +170,6 @@ def test_evaluator_init(load_state_mock: MagicMock) -> None: ) ) assert evaluator._device == "cpu" - assert evaluator._device_type == "cpu" assert not evaluator._amp load_state_mock.assert_called_once_with(pathlib.Path("trained_model.modyn")) @@ -183,7 +182,6 @@ def test_no_transform_evaluator_init(load_state_mock: MagicMock): assert isinstance(evaluator._model.model, MockModel) assert not evaluator._label_transformer_function assert evaluator._device == "cpu" - assert evaluator._device_type == "cpu" assert not evaluator._amp load_state_mock.assert_called_once_with(pathlib.Path("trained_model.modyn")) From 2fee48bb9ff1a4b5d121920213d0f63f5fadb64a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Fri, 20 Sep 2024 16:05:51 +0200 Subject: [PATCH 02/14] improve check at evaluator --- .../evaluator/internal/grpc/evaluator_grpc_servicer.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py b/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py index d98bce52a..004bf9cae 100644 --- a/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py +++ b/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py @@ -317,12 +317,16 @@ def get_evaluation_result( single_eval_data = EvaluationIntervalData(interval_index=interval_idx, evaluation_data=metric_result) evaluation_data.append(single_eval_data) - if len(evaluation_data) < len(self._evaluation_dict[evaluation_id].not_failed_interval_ids): + num_metrics = len(self._evaluation_dict[evaluation_id].raw_metrics) + expected_results = len(self._evaluation_dict[evaluation_id].not_failed_interval_ids) * num_metrics + if len(evaluation_data) < expected_results: logger.error( f"Could not retrieve results for all intervals of evaluation {evaluation_id}. " - f"Expected {len(self._evaluation_dict[evaluation_id].not_failed_interval_ids)}, " - f"but got {len(evaluation_data)}. Maybe an exception happened during evaluation." + f"Expected {len(self._evaluation_dict[evaluation_id].not_failed_interval_ids)} * {num_metrics} = {expected_results} results, " + f"but got {len(evaluation_data)} results. Most likely, an exception happened during evaluation." ) + return EvaluationResultResponse(valid=False) + return EvaluationResultResponse(valid=True, evaluation_results=evaluation_data) def cleanup_evaluations( From 4e3564d6cff5b66bee2fd3e22ad4fc31f88aa614 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Fri, 20 Sep 2024 16:23:45 +0200 Subject: [PATCH 03/14] retry on exception at evaluator --- modyn/supervisor/internal/grpc_handler.py | 50 +++++++++++++++---- .../pipeline_executor/evaluation_executor.py | 24 +++++++-- 2 files changed, 59 insertions(+), 15 deletions(-) diff --git a/modyn/supervisor/internal/grpc_handler.py b/modyn/supervisor/internal/grpc_handler.py index 9f6330220..44b7da2b2 100644 --- a/modyn/supervisor/internal/grpc_handler.py +++ b/modyn/supervisor/internal/grpc_handler.py @@ -302,28 +302,46 @@ def wait_for_evaluation_completion(self, evaluation_id: int) -> bool: self.init_evaluator() raise e + # NOT within retry block if not res.valid: - exception_msg = f"Evaluation {evaluation_id} is invalid at server:\n{res}\n" - logger.error(exception_msg) - raise RuntimeError(exception_msg) + # Should only happen when requesting invalid id, hence we throw + _msg = f"Evaluation {evaluation_id} is invalid at server:\n{res}\n" + logger.error(_msg) + raise RuntimeError(_msg) if res.HasField("exception"): - exception_msg = f"Exception at evaluator occurred:\n{res.exception}\n\n" - logger.error(exception_msg) + logger.error(f"Exception at evaluator occurred:\n{res.exception}\n\n") + self.cleanup_evaluations([evaluation_id]) + logger.error(f"Performed cleanup for evaluation {evaluation_id} that threw exception.") has_exception = True - break + break # Exit busy wait + if not res.is_running: - break + break # Exit busy wait + sleep(1) + return not has_exception def get_evaluation_results(self, evaluation_id: int) -> list[EvaluationIntervalData]: assert self.evaluator is not None if not self.connected_to_evaluator: raise ConnectionError("Tried to wait for evaluation to finish, but not there is no gRPC connection.") - req = EvaluationResultRequest(evaluation_id=evaluation_id) - res: EvaluationResultResponse = self.evaluator.get_evaluation_result(req) + + for attempt in Retrying( + stop=stop_after_attempt(5), + wait=wait_random_exponential(multiplier=1, min=2, max=60), + reraise=True, + ): + with attempt: + try: + res: EvaluationResultResponse = self.evaluator.get_evaluation_result(req) + except grpc.RpcError as e: # We catch and reraise to easily reconnect + logger.error(e) + logger.error(f"[Evaluation {evaluation_id}]: gRPC connection error, trying to reconnect.") + self.init_evaluator() + raise e if not res.valid: logger.error(f"Cannot get the evaluation result for evaluation {evaluation_id}") @@ -334,7 +352,19 @@ def cleanup_evaluations(self, evaluation_ids: list[int]) -> None: assert self.evaluator is not None req = EvaluationCleanupRequest(evaluation_ids=set(evaluation_ids)) - res: EvaluationCleanupResponse = self.evaluator.cleanup_evaluations(req) + for attempt in Retrying( + stop=stop_after_attempt(5), + wait=wait_random_exponential(multiplier=1, min=2, max=60), + reraise=True, + ): + with attempt: + try: + res: EvaluationCleanupResponse = self.evaluator.cleanup_evaluations(req) + except grpc.RpcError as e: # We catch and reraise to easily reconnect + logger.error(e) + logger.error(f"[Evaluations {evaluation_ids}]: gRPC connection error, trying to reconnect.") + self.init_evaluator() + raise e failed = set(evaluation_ids) - {int(i) for i in res.succeeded} if failed: diff --git a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py index cb4bfd4c3..81cf7b49d 100644 --- a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py +++ b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py @@ -343,9 +343,11 @@ def _single_batched_evaluation( def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str: return EvaluationAbortedReason.DESCRIPTOR.values_by_number[eval_aborted_reason].name + started_evaluations = [] + for attempt in Retrying( stop=stop_after_attempt(10), - wait=wait_random_exponential(multiplier=1, min=2, max=120), + wait=wait_random_exponential(multiplier=2, min=2, max=180), reraise=True, ): with attempt: @@ -377,9 +379,17 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str: return failure_reasons logger.info(f"Evaluation started for model {model_id_to_eval} on intervals {intervals}.") - self.grpc.wait_for_evaluation_completion(response.evaluation_id) - eval_data = self.grpc.get_evaluation_results(response.evaluation_id) - self.grpc.cleanup_evaluations([response.evaluation_id]) + started_evaluations.append(response.evaluation_id) + if not self.grpc.wait_for_evaluation_completion(response.evaluation_id): + raise RuntimeError("There was an exception during evaluation") # Trigger retry + + eval_data = self.grpc.get_evaluation_results( + response.evaluation_id + ) # Will throw in case of invalid result + + self.grpc.cleanup_evaluations( + [response.evaluation_id] + ) # Early cleanup if succeeded since we have the data eval_results: list[tuple[str | None, dict[str, Any]]] = [] @@ -397,6 +407,7 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str: # the next loop that unwraps `EvaluationResultResponse`. # ----------------------------------------------------- . ---------------------------------------------------- # + # Prepare eval_results structure for interval_response in response.interval_responses: if interval_response.eval_aborted_reason != EvaluationAbortedReason.NOT_ABORTED: reason = get_failure_reason(interval_response.eval_aborted_reason) @@ -409,6 +420,7 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str: ) ) + # Parse response into eval_results structure for interval_result in eval_data: interval_idx = interval_result.interval_index assert ( @@ -438,9 +450,11 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str: + str(eval_results) ) + # All checks succeeded + self.grpc.cleanup_evaluations(started_evaluations) # Make sure to clean up everything we started. return eval_results - raise RuntimeError("Unreachable code - satisfy mypy.") + raise RuntimeError("Unreachable code - just to satisfy mypy.") # ------------------------------------------------------------------------------------ # From b5d28cddff1437d2993e0034ca1fd222b47d0d14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Fri, 20 Sep 2024 16:24:54 +0200 Subject: [PATCH 04/14] minor --- modyn/supervisor/internal/grpc_handler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/modyn/supervisor/internal/grpc_handler.py b/modyn/supervisor/internal/grpc_handler.py index 44b7da2b2..a5544ba25 100644 --- a/modyn/supervisor/internal/grpc_handler.py +++ b/modyn/supervisor/internal/grpc_handler.py @@ -344,8 +344,10 @@ def get_evaluation_results(self, evaluation_id: int) -> list[EvaluationIntervalD raise e if not res.valid: - logger.error(f"Cannot get the evaluation result for evaluation {evaluation_id}") - raise RuntimeError(f"Cannot get the evaluation result for evaluation {evaluation_id}") + _msg = f"Cannot get the evaluation result for evaluation {evaluation_id}" + logger.error(_msg) + raise RuntimeError(_msg) + return res.evaluation_results def cleanup_evaluations(self, evaluation_ids: list[int]) -> None: From 5cd01316451fc6da1886cf3721a630a771ccf192 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Mon, 23 Sep 2024 21:40:44 +0800 Subject: [PATCH 05/14] find invalid runs notebook --- analytics/tools/find_invalid_runs.ipynb | 126 ++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 analytics/tools/find_invalid_runs.ipynb diff --git a/analytics/tools/find_invalid_runs.ipynb b/analytics/tools/find_invalid_runs.ipynb new file mode 100644 index 000000000..e76e226c6 --- /dev/null +++ b/analytics/tools/find_invalid_runs.ipynb @@ -0,0 +1,126 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Motivation\n", + "\n", + "This notebook can be used to find pipeline runs where we have empty evaluation responses despite expecting some. Older versions of Modyn have lest robustness in the evaluation handling." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "\n", + "from tqdm import tqdm\n", + "\n", + "from modyn.supervisor.internal.grpc.enums import PipelineStage\n", + "from modyn.supervisor.internal.pipeline_executor.models import MultiEvaluationInfo, PipelineLogs, SingleEvaluationInfo\n", + "\n", + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "log_dir = Path(\"/Users/mboether/phd/dynamic-data/sigmod-data/yearbook/debug/logs\")\n", + "logfiles = [logfile for logfile in log_dir.glob(\"**/pipeline.log\")]\n", + "logfiles" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def metrics_valid(logfile: Path):\n", + " logs = PipelineLogs.model_validate_json(logfile.read_text())\n", + " for eval_log in logs.supervisor_logs.stage_runs:\n", + " if eval_log.id == PipelineStage.EVALUATE_MULTI.name:\n", + " multiinfo = eval_log.info\n", + " assert isinstance(multiinfo, MultiEvaluationInfo)\n", + "\n", + " for info in multiinfo.interval_results:\n", + " assert isinstance(info, SingleEvaluationInfo)\n", + " res = info.results\n", + "\n", + " if len(res[\"metrics\"]) == 0:\n", + " if res[\"dataset_size\"] == 0:\n", + " print(\n", + " f\"Warning: Empty metrics but empty dataset in {logfile}: {info}\"\n", + " ) # Might want to remove this - not sure if needed.\n", + " else:\n", + " return False\n", + "\n", + " return True" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "invalid_pipelines = []\n", + "for logfile in tqdm(logfiles):\n", + " if not metrics_valid(logfile):\n", + " invalid_pipelines.append(logfile)\n", + "\n", + "invalid_pipelines\n", + "\n", + "# Typically, you'd want to delete those directories because they are invalid (see next cell)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Commented out for safety\n", + "\n", + "\"\"\"\n", + "import shutil\n", + "parent_dirs = {file_path.parent for file_path in invalid_pipelines}\n", + "\n", + "for directory in parent_dirs:\n", + " try:\n", + " shutil.rmtree(directory)\n", + " except Exception as e:\n", + " print(f\"Failed to delete {directory}: {e}\")\n", + "\"\"\"" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.8" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} From 5f23bc8e8496a3a44c09c4172bd3470b73f96398 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Mon, 23 Sep 2024 22:41:41 +0800 Subject: [PATCH 06/14] Some fixes --- .../internal/grpc/evaluator_grpc_servicer.py | 11 +++++------ modyn/supervisor/internal/grpc_handler.py | 3 +++ .../internal/pipeline_executor/evaluation_executor.py | 4 +++- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py b/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py index 004bf9cae..4db4486bb 100644 --- a/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py +++ b/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py @@ -299,16 +299,16 @@ def get_evaluation_result( logger.info(f"Received get evaluation result request for evaluation {evaluation_id}.") if evaluation_id not in self._evaluation_dict: - logger.error(f"Evaluation with id {evaluation_id} has not been registered.") + logger.error(f"Evaluation {evaluation_id} has not been registered.") return EvaluationResultResponse(valid=False) self._drain_result_queue(evaluation_id) # Should already be drained, but just make sure if self._evaluation_process_dict[evaluation_id].process_handler.is_alive(): - logger.error(f"Evaluation with id {evaluation_id} is still running.") + logger.error(f"Evaluation {evaluation_id} is still running.") return EvaluationResultResponse(valid=False) - logger.info("Returning results of all metrics.") + logger.info(f"[Evaluation {evaluation_id}] Returning results of all metrics.") self._drain_result_queue(evaluation_id) # Should not do anything, but let's make sure evaluation_data: list[EvaluationIntervalData] = [] @@ -317,12 +317,11 @@ def get_evaluation_result( single_eval_data = EvaluationIntervalData(interval_index=interval_idx, evaluation_data=metric_result) evaluation_data.append(single_eval_data) - num_metrics = len(self._evaluation_dict[evaluation_id].raw_metrics) - expected_results = len(self._evaluation_dict[evaluation_id].not_failed_interval_ids) * num_metrics + expected_results = len(self._evaluation_dict[evaluation_id].not_failed_interval_ids) if len(evaluation_data) < expected_results: logger.error( f"Could not retrieve results for all intervals of evaluation {evaluation_id}. " - f"Expected {len(self._evaluation_dict[evaluation_id].not_failed_interval_ids)} * {num_metrics} = {expected_results} results, " + f"Expected {expected_results} results, " f"but got {len(evaluation_data)} results. Most likely, an exception happened during evaluation." ) return EvaluationResultResponse(valid=False) diff --git a/modyn/supervisor/internal/grpc_handler.py b/modyn/supervisor/internal/grpc_handler.py index a5544ba25..b2d98c1af 100644 --- a/modyn/supervisor/internal/grpc_handler.py +++ b/modyn/supervisor/internal/grpc_handler.py @@ -317,6 +317,7 @@ def wait_for_evaluation_completion(self, evaluation_id: int) -> bool: break # Exit busy wait if not res.is_running: + logger.info(f"Evaluation {evaluation_id} has finished successfully.") break # Exit busy wait sleep(1) @@ -348,6 +349,8 @@ def get_evaluation_results(self, evaluation_id: int) -> list[EvaluationIntervalD logger.error(_msg) raise RuntimeError(_msg) + logger.debug(f"Obtained evaluation results for evaluation {evaluation_id}") + return res.evaluation_results def cleanup_evaluations(self, evaluation_ids: list[int]) -> None: diff --git a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py index 81cf7b49d..cb87fac43 100644 --- a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py +++ b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py @@ -378,7 +378,9 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str: # This is likely due to an invalid request in the first place. return failure_reasons - logger.info(f"Evaluation started for model {model_id_to_eval} on intervals {intervals}.") + logger.info( + f"Evaluation {response.evaluation_id} started for model {model_id_to_eval} on intervals {intervals}." + ) started_evaluations.append(response.evaluation_id) if not self.grpc.wait_for_evaluation_completion(response.evaluation_id): raise RuntimeError("There was an exception during evaluation") # Trigger retry From e35627327f1f7298125524d5ca8daf4b48d29941 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Mon, 23 Sep 2024 22:50:03 +0800 Subject: [PATCH 07/14] fix some evaluator tets to new valid meaning --- .../evaluator/internal/grpc/test_evaluator_grpc_servicer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py b/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py index 1be7ea675..91a7fb77c 100644 --- a/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py +++ b/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py @@ -485,7 +485,7 @@ def test__run_evaluation_retain_metrics_before_real_exception(test_connect_to_st get_result_req = EvaluationResultRequest(evaluation_id=evaluation_id) get_result_resp = evaluator.get_evaluation_result(get_result_req, None) - assert get_result_resp.valid + assert not get_result_resp.valid # evaluation on the last interval was not finished assert len(get_result_resp.evaluation_results) == len(intervals) - 1 assert get_result_resp.evaluation_results[0].interval_index == 0 @@ -535,7 +535,7 @@ def test_get_evaluation_result_incomplete_metric(test_is_alive, test_connect_to_ metric_result_queue = evaluation_process_info.metric_result_queue metric_result_queue.put((1, [("Accuracy", 0.5)])) response = evaluator.get_evaluation_result(EvaluationResultRequest(evaluation_id=3), None) - assert response.valid + assert not response.valid assert len(response.evaluation_results) == 1 assert response.evaluation_results[0].interval_index == 1 assert len(response.evaluation_results[0].evaluation_data) == 1 From 9f5178ff0ce6d1b7b9c39e74ca906f29e88fd9f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Tue, 24 Sep 2024 10:38:10 +0800 Subject: [PATCH 08/14] only pop if in there to avoid exception --- modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py b/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py index 4db4486bb..d92d064b3 100644 --- a/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py +++ b/modyn/evaluator/internal/grpc/evaluator_grpc_servicer.py @@ -353,9 +353,12 @@ def cleanup_evaluations( self._evaluation_process_dict.pop(evaluation_id) for e_id in evaluation_ids: - self._evaluation_dict.pop(e_id) - self._evaluation_data_dict.pop(e_id) - self._evaluation_data_dict_locks.pop(e_id) + if e_id in self._evaluation_dict: + self._evaluation_dict.pop(e_id) + if e_id in self._evaluation_data_dict: + self._evaluation_data_dict.pop(e_id) + if e_id in self._evaluation_data_dict_locks: + self._evaluation_data_dict_locks.pop(e_id) gc.collect() return EvaluationCleanupResponse(succeeded=list(sorted(already_cleaned + not_yet_cleaned))) From 6cd49e1d3344bb9491ee585ee6290e7f5b44be8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Tue, 24 Sep 2024 23:19:49 +0800 Subject: [PATCH 09/14] fix more tests --- .../grpc/test_evaluator_grpc_servicer.py | 13 ------- .../test_evaluation_executor.py | 7 +++- .../supervisor/internal/test_grpc_handler.py | 36 ++++++++----------- 3 files changed, 20 insertions(+), 36 deletions(-) diff --git a/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py b/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py index 91a7fb77c..0dffb3a1d 100644 --- a/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py +++ b/modyn/tests/evaluator/internal/grpc/test_evaluator_grpc_servicer.py @@ -486,14 +486,6 @@ def test__run_evaluation_retain_metrics_before_real_exception(test_connect_to_st get_result_req = EvaluationResultRequest(evaluation_id=evaluation_id) get_result_resp = evaluator.get_evaluation_result(get_result_req, None) assert not get_result_resp.valid - # evaluation on the last interval was not finished - assert len(get_result_resp.evaluation_results) == len(intervals) - 1 - assert get_result_resp.evaluation_results[0].interval_index == 0 - assert len(get_result_resp.evaluation_results[0].evaluation_data) == 2 - assert get_result_resp.evaluation_results[0].evaluation_data[0].metric == "Accuracy" - assert get_result_resp.evaluation_results[0].evaluation_data[0].result == pytest.approx(0.5) - assert get_result_resp.evaluation_results[0].evaluation_data[1].metric == "F1Score" - assert get_result_resp.evaluation_results[0].evaluation_data[1].result == pytest.approx(0.6) @patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) @@ -536,11 +528,6 @@ def test_get_evaluation_result_incomplete_metric(test_is_alive, test_connect_to_ metric_result_queue.put((1, [("Accuracy", 0.5)])) response = evaluator.get_evaluation_result(EvaluationResultRequest(evaluation_id=3), None) assert not response.valid - assert len(response.evaluation_results) == 1 - assert response.evaluation_results[0].interval_index == 1 - assert len(response.evaluation_results[0].evaluation_data) == 1 - assert response.evaluation_results[0].evaluation_data[0].result == pytest.approx(0.5) - assert response.evaluation_results[0].evaluation_data[0].metric == "Accuracy" @patch.object(EvaluatorGRPCServicer, "connect_to_storage", return_value=DummyStorageStub()) diff --git a/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py b/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py index 2e082d428..9f6a9dc9d 100644 --- a/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py +++ b/modyn/tests/supervisor/internal/pipeline_executor/test_evaluation_executor.py @@ -299,6 +299,11 @@ def test_single_batched_evaluation_mixed( eval_req2 = dummy_eval_request() eval_req.interval_start = 64 eval_req.interval_end = 14 + + request = MagicMock() + request.metrics = ["Accuracy"] + test_prepare_evaluation_request.return_value = request + results = evaluation_executor._single_batched_evaluation( [ (eval_req.interval_start, eval_req.interval_end), @@ -317,4 +322,4 @@ def test_single_batched_evaluation_mixed( ) test_wait_for_evaluation_completion.assert_called_once() test_get_evaluation_results.assert_called_once() - test_cleanup_evaluations.assert_called_once() + assert test_cleanup_evaluations.call_count == 2 diff --git a/modyn/tests/supervisor/internal/test_grpc_handler.py b/modyn/tests/supervisor/internal/test_grpc_handler.py index 29dfadf61..6419e4e41 100644 --- a/modyn/tests/supervisor/internal/test_grpc_handler.py +++ b/modyn/tests/supervisor/internal/test_grpc_handler.py @@ -368,30 +368,22 @@ def test_wait_for_evaluation_completion(*args): assert handler.evaluator is not None with patch.object(handler.evaluator, "get_evaluation_status") as status_method: - status_method.side_effect = [ - EvaluationStatusResponse(valid=True, is_running=True), - EvaluationStatusResponse(valid=False), - EvaluationStatusResponse(valid=True, is_running=False), - ] - with pytest.raises(RuntimeError): - handler.wait_for_evaluation_completion(10) - assert status_method.call_count == 2 + with patch.object(handler.evaluator, "cleanup_evaluations") as _: + status_method.side_effect = [ + EvaluationStatusResponse(valid=True, is_running=True), + EvaluationStatusResponse(valid=True, is_running=False), + ] - status_method.reset_mock() - status_method.side_effect = [ - EvaluationStatusResponse(valid=True, exception="Some error"), - EvaluationStatusResponse(valid=True, is_running=False), - ] - assert not handler.wait_for_evaluation_completion(10) - assert status_method.call_count == 1 + assert handler.wait_for_evaluation_completion(10) + assert status_method.call_count == 2 - status_method.reset_mock() - status_method.side_effect = [ - EvaluationStatusResponse(valid=True, is_running=True), - EvaluationStatusResponse(valid=True, is_running=False), - ] - assert handler.wait_for_evaluation_completion(10) - assert status_method.call_count == 2 + status_method.reset_mock() + status_method.side_effect = [ + EvaluationStatusResponse(valid=True, exception="Some error"), + EvaluationStatusResponse(valid=True, is_running=False), + ] + assert not handler.wait_for_evaluation_completion(10) + assert status_method.call_count == 1 @patch("modyn.supervisor.internal.grpc_handler.grpc_connection_established", return_value=True) From 7d4c54869fcacafa81e3b8212d5c205f750a7ed1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Tue, 24 Sep 2024 23:29:15 +0800 Subject: [PATCH 10/14] fix pylint --- .../internal/grpc/selector_grpc_servicer.py | 17 +++++++++++++---- .../internal/triggers/datadrifttrigger.py | 2 +- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/modyn/selector/internal/grpc/selector_grpc_servicer.py b/modyn/selector/internal/grpc/selector_grpc_servicer.py index d3012b9fe..ea6f76984 100644 --- a/modyn/selector/internal/grpc/selector_grpc_servicer.py +++ b/modyn/selector/internal/grpc/selector_grpc_servicer.py @@ -58,8 +58,13 @@ def get_sample_keys_and_weights( # pylint: disable-next=unused-argument pid = os.getpid() logger.info( - f"[{pid}][{tid}][Pipeline {pipeline_id}]: Fetching samples for trigger id {trigger_id}" - + f" and worker id {worker_id} and partition id {partition_id}" + "[%s][%s][Pipeline %s]: Fetching samples for trigger id %s and worker id %s and partition id %s", + pid, + tid, + pipeline_id, + trigger_id, + worker_id, + partition_id, ) samples = self.selector_manager.get_sample_keys_and_weights(pipeline_id, trigger_id, worker_id, partition_id) @@ -90,8 +95,12 @@ def inform_data_and_trigger(self, request: DataInformRequest, context: grpc.Serv tid = threading.get_native_id() pid = os.getpid() logger.info( - f"[{pid}][{tid}][Pipeline {pipeline_id}]: Selector is informed of {len(keys)} new data points" - + f"+ trigger at timestamp {timestamps[-1] if len(keys) > 0 else 'n/a'}" + "[%s][%s][Pipeline %s]: Selector is informed of %s new data points + trigger at timestamp %s", + pid, + tid, + pipeline_id, + len(keys), + timestamps[-1] if keys else "n/a", ) trigger_id, log = self.selector_manager.inform_data_and_trigger(pipeline_id, keys, timestamps, labels) diff --git a/modyn/supervisor/internal/triggers/datadrifttrigger.py b/modyn/supervisor/internal/triggers/datadrifttrigger.py index 6dbac8d1b..59a478c5a 100644 --- a/modyn/supervisor/internal/triggers/datadrifttrigger.py +++ b/modyn/supervisor/internal/triggers/datadrifttrigger.py @@ -299,7 +299,7 @@ def _run_detection( metric_result.distance ) - logger.info(f"[DataDriftDetector][Dataset {self.dataloader_info.dataset_id}]" + f"[Result] {drift_results}") + logger.info("[DataDriftDetector][Dataset %s][Result] %s", self.dataloader_info.dataset_id, drift_results) if is_warmup: return False, {} From c717ffc101105d8077ccfbed7a2b98fcc8714a0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Tue, 24 Sep 2024 23:41:25 +0800 Subject: [PATCH 11/14] fix pylint --- .../internal/grpc/selector_grpc_servicer.py | 23 +++++++------------ .../internal/triggers/datadrifttrigger.py | 4 ++-- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/modyn/selector/internal/grpc/selector_grpc_servicer.py b/modyn/selector/internal/grpc/selector_grpc_servicer.py index ea6f76984..4db568387 100644 --- a/modyn/selector/internal/grpc/selector_grpc_servicer.py +++ b/modyn/selector/internal/grpc/selector_grpc_servicer.py @@ -57,15 +57,11 @@ def get_sample_keys_and_weights( # pylint: disable-next=unused-argument tid = threading.get_native_id() pid = os.getpid() - logger.info( - "[%s][%s][Pipeline %s]: Fetching samples for trigger id %s and worker id %s and partition id %s", - pid, - tid, - pipeline_id, - trigger_id, - worker_id, - partition_id, + _logmsg = ( + f"[{pid}][{tid}][Pipeline {pipeline_id}]: Fetching samples for trigger id {trigger_id}" + + f" and worker id {worker_id} and partition id {partition_id}" ) + logger.info(_logmsg) samples = self.selector_manager.get_sample_keys_and_weights(pipeline_id, trigger_id, worker_id, partition_id) @@ -94,14 +90,11 @@ def inform_data_and_trigger(self, request: DataInformRequest, context: grpc.Serv pipeline_id, keys, timestamps, labels = request.pipeline_id, request.keys, request.timestamps, request.labels tid = threading.get_native_id() pid = os.getpid() - logger.info( - "[%s][%s][Pipeline %s]: Selector is informed of %s new data points + trigger at timestamp %s", - pid, - tid, - pipeline_id, - len(keys), - timestamps[-1] if keys else "n/a", + _lgmsg = ( + f"[{pid}][{tid}][Pipeline {pipeline_id}]: Selector is informed of {len(keys)} new data points" + + f"+ trigger at timestamp {timestamps[-1] if len(keys) > 0 else 'n/a'}" ) + logger.info(_lgmsg) trigger_id, log = self.selector_manager.inform_data_and_trigger(pipeline_id, keys, timestamps, labels) return TriggerResponse(trigger_id=trigger_id, log=JsonString(value=json.dumps(log))) diff --git a/modyn/supervisor/internal/triggers/datadrifttrigger.py b/modyn/supervisor/internal/triggers/datadrifttrigger.py index 59a478c5a..6454a9211 100644 --- a/modyn/supervisor/internal/triggers/datadrifttrigger.py +++ b/modyn/supervisor/internal/triggers/datadrifttrigger.py @@ -298,8 +298,8 @@ def _run_detection( drift_results[metric_name].is_drift = self.decision_policies[metric_name].evaluate_decision( metric_result.distance ) - - logger.info("[DataDriftDetector][Dataset %s][Result] %s", self.dataloader_info.dataset_id, drift_results) + _logmsg = f"[DataDriftDetector][Dataset {self.dataloader_info.dataset_id}]" + f"[Result] {drift_results}" + logger.info(_logmsg) if is_warmup: return False, {} From 9a540e93f14a2ac0937b0f5c09cc92519212b556 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Tue, 24 Sep 2024 23:47:52 +0800 Subject: [PATCH 12/14] i hate pylint --- modyn/selector/internal/grpc/selector_grpc_servicer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modyn/selector/internal/grpc/selector_grpc_servicer.py b/modyn/selector/internal/grpc/selector_grpc_servicer.py index 4db568387..7750d5fe8 100644 --- a/modyn/selector/internal/grpc/selector_grpc_servicer.py +++ b/modyn/selector/internal/grpc/selector_grpc_servicer.py @@ -45,7 +45,7 @@ def __init__(self, selector_manager: SelectorManager, sample_batch_size: int): self.selector_manager = selector_manager self._sample_batch_size = sample_batch_size - def get_sample_keys_and_weights( # pylint: disable-next=unused-argument + def get_sample_keys_and_weights( # pylint: disable-next=unused-argument, too-many-locals self, request: GetSamplesRequest, context: grpc.ServicerContext ) -> Iterable[SamplesResponse]: pipeline_id, trigger_id, worker_id, partition_id = ( From 1d2cddb4f1cc994d0454e348dadab44b37601d7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Tue, 24 Sep 2024 23:52:51 +0800 Subject: [PATCH 13/14] too many locals --- .pylintrc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.pylintrc b/.pylintrc index 9469027d4..c94ec0da8 100644 --- a/.pylintrc +++ b/.pylintrc @@ -186,8 +186,9 @@ disable=raw-checker-failed, too-many-arguments, # we can't determine a good limit here. reviews should spot bad cases of this. duplicate-code, # Mostly imports and test setup. cyclic-import, # We use these inside methods that require models from multiple apps. Tests will catch actual errors. - too-many-instance-attributes, # We always ignore this anyways - too-many-positional-arguments # We do not want to limit the number of positional arguments + too-many-instance-attributes, # We always ignore this anyways + too-many-positional-arguments, # We do not want to limit the number of positional arguments + too-many-locals # We always ignore this anyways # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option # multiple time (only on the command line, not in the configuration file where From 404e0de2e9f3c9fcff0d4a548162bc6bfcc4f485 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20B=C3=B6ther?= Date: Thu, 26 Sep 2024 15:04:06 +0400 Subject: [PATCH 14/14] fix another test --- .../pipeline_executor/evaluation_executor.py | 6 ++++-- .../pipeline_executor/test_pipeline_executor.py | 17 ++++++++++++++--- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py index cb87fac43..75333d46f 100644 --- a/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py +++ b/modyn/supervisor/internal/pipeline_executor/evaluation_executor.py @@ -436,7 +436,7 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str: # Will trigger a retry in case this is not successful # Can happen, e.g., if the evaluator is overloaded expected_num_metrics = len(request.metrics) - for abort_reason, data_dict in eval_results: + for result_id, (abort_reason, data_dict) in enumerate(eval_results): if abort_reason is not None: # If there was any reason to abort, we don't care continue @@ -446,10 +446,12 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str: ), f"dataset size of 0, but no EMPTY_INTERVAL response: {eval_results}" actual_num_metrics = len(data_dict["metrics"]) assert actual_num_metrics == expected_num_metrics, ( - f"actual_num_metrics = {actual_num_metrics}" + f"result {result_id}: actual_num_metrics = {actual_num_metrics}" + f" != expected_num_metrics = {expected_num_metrics}" + "\n" + str(eval_results) + + "\n\n" + + str(eval_data) ) # All checks succeeded diff --git a/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py b/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py index d05a39d73..a4f870ee3 100644 --- a/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py +++ b/modyn/tests/supervisor/internal/pipeline_executor/test_pipeline_executor.py @@ -22,6 +22,7 @@ EvaluateModelResponse, EvaluationAbortedReason, EvaluationIntervalData, + SingleMetricResult, ) from modyn.supervisor.internal.eval.strategies.abstract import EvalInterval from modyn.supervisor.internal.eval.strategies.slicing import SlicingEvalStrategy @@ -826,7 +827,12 @@ def get_eval_intervals( ], ) ] - test_get_evaluation_results.return_value = [EvaluationIntervalData() for _ in range(3)] + test_get_evaluation_results.return_value = [ + EvaluationIntervalData( + interval_index=idx, evaluation_data=[SingleMetricResult(metric="Accuracy", result=0.5)] + ) + for idx in [0, 2] + ] else: intervals = [ @@ -851,7 +857,12 @@ def get_eval_intervals( evaluation_id=42, interval_responses=[success_interval for _ in range(len(intervals))], ) - test_get_evaluation_results.return_value = [EvaluationIntervalData() for _ in range(len(intervals))] + test_get_evaluation_results.return_value = [ + EvaluationIntervalData( + interval_index=idx, evaluation_data=[SingleMetricResult(metric="Accuracy", result=0.5)] + ) + for idx in range(len(intervals)) + ] pe.grpc.evaluator = evaluator_stub_mock @@ -869,7 +880,7 @@ def get_eval_intervals( assert evaluator_stub_mock.evaluate_model.call_count == 1 # batched if test_failure: - assert test_cleanup_evaluations.call_count == 1 + assert test_cleanup_evaluations.call_count == 2 assert test_wait_for_evaluation_completion.call_count == 1 stage_info = [