diff --git a/app/processing_watchdog.py b/app/processing_watchdog.py new file mode 100644 index 0000000..945c657 --- /dev/null +++ b/app/processing_watchdog.py @@ -0,0 +1,106 @@ +""" + Watchdog для мониторинга и прерывания длительных обработок попыток и их прерывания + Запускать как отдельный сервис + + Два значения читаются из конфига - макс. время попытки и интервал проверок + [constants] + processing_limit = () + interval_time = () + + Взаимодействие с базой данных: + - Использует TrainingsDBManager для доступа к тренировкам + - Находит все тренировки, у которых задано поле `processing_start_timestamp` + - Если прошло больше processing_limit секунд: + * Добавляет в базу вердикт с комментарием "обратиться к администраторам" через append_verdict() + * Устанавливает оценку 0 через set_score() + * Меняет статусы presentation и audio на PROCESSING_FAILED, если они ещё не финальные + - После каждой итерации ждёт interval_time секунд и повторяет проверку +""" + +import time +from datetime import datetime, timezone + +from app.config import Config +from app.mongo_odm import TrainingsDBManager +from app.root_logger import get_root_logger +from app.status import TrainingStatus, PresentationStatus, AudioStatus + +logger = get_root_logger(service_name='processing_watchdog') + +DEFAULT_MAX_SECONDS = 300 +DEFAULT_INTERVAL_SECONDS = 30 + +def get_config_values(): + try: + return int(Config.c.constants.processing_limit), int(Config.c.constants.interval_time) + except Exception as e: + logger.warning("Failed to read config values, using defaults: %s", e) + return DEFAULT_MAX_SECONDS, DEFAULT_INTERVAL_SECONDS + +def time_now(): + return datetime.now(timezone.utc) + +def run_once(max_seconds): + """ + Ищет все тренировки, у которых задан processing_start_timestamp + и проверяет их статус + """ + now = time_now() + trainings = TrainingsDBManager().get_trainings() + candidates = [] + for training in trainings: + process_started = getattr(training, "processing_start_timestamp", None) + if process_started is None: + continue + try: + started = datetime.fromtimestamp(process_started.time, timezone.utc) + except Exception as e: + logger.warning("Invalid timestamp in training %s: %s", getattr(training, "_id", "?"), e) + elapsed = (now - started).total_seconds() + if elapsed <= max_seconds: + continue + status = getattr(training, "status", None) + if status in [TrainingStatus.PROCESSED, TrainingStatus.PROCESSING_FAILED, TrainingStatus.PREPARATION_FAILED]: + continue + candidates.append((training, elapsed)) + for training, elapsed in candidates: + training_id = getattr(training, "_id", None) + if training_id is None: + continue + try: + msg = ("Техническая ошибка: время обработки превысило лимит {:.0f} секунд. " + "Оценка выставлена автоматически как 0. Пожалуйста, обратитесь к администраторам." + ).format(elapsed) + logger.warning("Training %s exceeded processing timeout (%.0f s)", training_id, elapsed) + TrainingsDBManager().append_verdict(training_id, msg) + TrainingsDBManager().set_score(training_id, 0) + try: + pres_status = getattr(training, "presentation_status", None) + if pres_status not in [PresentationStatus.PROCESSED, PresentationStatus.PROCESSING_FAILED]: + TrainingsDBManager().change_presentation_status(training_id, PresentationStatus.PROCESSING_FAILED) + except Exception as e: + logger.exception("Failed to mark presentation status as failed for training %s: %s", training_id, e) + try: + audio_status = getattr(training, "audio_status", None) + if audio_status not in [AudioStatus.PROCESSED, AudioStatus.PROCESSING_FAILED]: + TrainingsDBManager().change_audio_status(training_id, AudioStatus.PROCESSING_FAILED) + except Exception as e: + logger.exception("Failed to mark audio status as failed for training %s: %s", training_id, e) + except Exception as e: + logger.exception("Error while handling timeout for training %s: %s", training_id, e) + +def run(): + max_seconds, interval_time = get_config_values() + logger.info("Processing watchdog started. Timeout = %s s, interval time = %s s", max_seconds, interval_time) + while True: + try: + run_once(max_seconds) + except Exception as e: + logger.exception("Unhandled error in processing watchdog main loop (%s)", e) + time.sleep(interval_time) + +if __name__ == "__main__": + import sys + if len(sys.argv) >= 2: + Config.init_config(sys.argv[1]) + run() \ No newline at end of file diff --git a/app_conf/config.ini b/app_conf/config.ini index 553e675..97c80a7 100644 --- a/app_conf/config.ini +++ b/app_conf/config.ini @@ -6,6 +6,8 @@ lti_consumer_secret=supersecretconsumersecret version_file=VERSION.json backup_path=../dump/database-dump/ storage_max_size_mbytes=20000 +processing_limit = 600 +interval_time = 30 [mongodb] url=mongodb://db:27017/ diff --git a/app_conf/testing.ini b/app_conf/testing.ini index dd228f4..75c2290 100644 --- a/app_conf/testing.ini +++ b/app_conf/testing.ini @@ -6,6 +6,8 @@ lti_consumer_secret=testing_lti_consumer_secret version_file=VERSION.json backup_path=../dump/database-dump/ storage_max_size_mbytes=20000 +processing_limit = 600 +interval_time = 30 [mongodb] url=mongodb://db:27017/ diff --git a/docker-compose.yml b/docker-compose.yml index 303a30e..4c07ba0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -86,6 +86,13 @@ services: volumes: - whisper_models:/root/.cache/whisper + processing_watchdog: + image: wst-image:v0.2 + command: python3 -m processing_watchdog $APP_CONF + restart: always + depends_on: + - db + volumes: whisper_models: nltk_data: