diff --git a/deploy/contents/install/preferences/launch.system.parameters.json b/deploy/contents/install/preferences/launch.system.parameters.json index d183c61a80..5efb5d1202 100644 --- a/deploy/contents/install/preferences/launch.system.parameters.json +++ b/deploy/contents/install/preferences/launch.system.parameters.json @@ -597,5 +597,26 @@ "defaultValue": "true", "description": "Changes default mail client if favor to pipe_mail script.", "passToWorkers": true + }, + { + "name": "CP_CAP_MOUNT_STORAGE_ATTEMPTS", + "type": "int", + "defaultValue": "1", + "description": "Specifies a number of single storage mount attempts", + "passToWorkers": true + }, + { + "name": "CP_CAP_MOUNT_STORAGE_RETRY_DELAY_SECONDS", + "type": "int", + "defaultValue": "1", + "description": "Specifies a delay in seconds between single storage mount retries", + "passToWorkers": true + }, + { + "name": "CP_CAP_MOUNT_STORAGE_LOGGING_LEVEL", + "type": "string", + "defaultValue": "INFO", + "description": "Specifies mount storages task logging level", + "passToWorkers": true } ] diff --git a/workflows/pipe-common/pipeline/utils/retry.py b/workflows/pipe-common/pipeline/utils/retry.py new file mode 100644 index 0000000000..863186e272 --- /dev/null +++ b/workflows/pipe-common/pipeline/utils/retry.py @@ -0,0 +1,34 @@ +# Copyright 2017-2023 EPAM Systems, Inc. (https://www.epam.com/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + + +def retry(func, attempts, delay_seconds, logger): + delay_seconds = max(delay_seconds, 0) + attempts = max(attempts, 1) + attempt = 0 + exceptions = [] + while attempt < attempts: + attempt += 1 + try: + return func(attempt, attempts) + except Exception as e: + exceptions.append(e) + if attempt >= attempts: + raise exceptions[-1] + logger.debug('Attempt {attempt}/{attempts} has failed. It will be retried in {delay_seconds} s.' + .format(attempt=attempt, attempts=attempts, delay_seconds=delay_seconds), + trace=True) + time.sleep(delay_seconds) diff --git a/workflows/pipe-common/scripts/mount_storage.py b/workflows/pipe-common/scripts/mount_storage.py index e512b62ad9..0327a9a25c 100644 --- a/workflows/pipe-common/scripts/mount_storage.py +++ b/workflows/pipe-common/scripts/mount_storage.py @@ -25,7 +25,9 @@ import traceback from abc import ABCMeta, abstractmethod -from pipeline import PipelineAPI, Logger, common, DataStorageWithShareMount, AclClass, APIError +from pipeline import PipelineAPI, common, DataStorageWithShareMount, AclClass, APIError +from pipeline.log.logger import Logger, LevelLogger, TaskLogger, RunLogger +from pipeline.utils.retry import retry READ_MASK = 1 WRITE_MASK = 1 << 1 @@ -112,6 +114,11 @@ def __init__(self, task): self.run_id = int(os.getenv('RUN_ID', 0)) self.region_id = int(os.getenv('CLOUD_REGION_ID', 0)) self.task_name = task + self.logging_level = os.getenv('CP_CAP_MOUNT_STORAGE_LOGGING_LEVEL', 'INFO') + logger = RunLogger(api=self.api, run_id=self.run_id) + logger = TaskLogger(task=self.task_name, inner=logger) + logger = LevelLogger(level=self.logging_level, inner=logger) + self.logger = logger if platform.system() == 'Windows': available_mounters = [S3Mounter, GCPMounter] else: @@ -281,7 +288,8 @@ def run(self, mount_root, tmp_dir): mounter = self.mounters[storage_and_mount.storage.storage_type](self.api, storage_and_mount.storage, storage_metadata, storage_and_mount.file_share_mount, - sensitive_policy) \ + sensitive_policy, + self.logger, self.logging_level) \ if storage_and_mount.storage.storage_type in self.mounters else None if not mounter: Logger.warn('Unsupported storage type {}.'.format(storage_and_mount.storage.storage_type), task_name=self.task_name) @@ -331,12 +339,14 @@ class StorageMounter: __metaclass__ = ABCMeta _cached_regions = [] - def __init__(self, api, storage, metadata, share_mount, sensitive_policy): + def __init__(self, api, storage, metadata, share_mount, sensitive_policy, logger, logging_level): self.api = api self.storage = storage self.metadata = metadata self.share_mount = share_mount self.sensitive_policy = sensitive_policy + self.logger = logger + self.logging_level = logging_level @staticmethod @abstractmethod @@ -388,7 +398,7 @@ def mount(self, mount_root, task_name): return params = self.build_mount_params(mount_point) mount_command = self.build_mount_command(params) - self.execute_mount(mount_command, params, task_name) + self.execute_mount(mount_command, params, task_name, logger=self.logger) def build_mount_point(self, mount_root): mount_point = self.storage.mount_point @@ -405,13 +415,28 @@ def build_mount_command(self, params): pass @staticmethod - def execute_mount(command, params, task_name): - result = common.execute_cmd_command(command, executable=None if StorageMounter.is_windows() else '/bin/bash') - if result == 0: - Logger.info('-->{path} mounted to {mount}'.format(**params), task_name=task_name) - else: + def execute_mount(command, params, task_name, logger): + logger.debug('Executing: {command}'.format(command=command, **params)) + try: + retry(lambda attempt, attempts: StorageMounter._execute_mount(command, params, logger, + attempt, attempts), + attempts=max(int(os.getenv('CP_CAP_MOUNT_STORAGE_ATTEMPTS', '1')), 1), + delay_seconds=max(int(os.getenv('CP_CAP_MOUNT_STORAGE_RETRY_DELAY_SECONDS', '1')), 0), + logger=logger) + Logger.info('--> Mounted {path} to {mount}'.format(**params), task_name=task_name) + except Exception: Logger.warn('--> Failed mounting {path} to {mount}'.format(**params), task_name=task_name) + @staticmethod + def _execute_mount(command, params, logger, attempt, attempts): + exit_code = common.execute_cmd_command(command, executable=None if StorageMounter.is_windows() else '/bin/bash') + if not exit_code: + return + err_msg = '--> Failed mounting {path} to {mount} ({attempt}/{attempts} attempt)' \ + .format(attempt=attempt, attempts=attempts, **params) + logger.warning(err_msg) + raise RuntimeError(err_msg) + def get_path(self): return self.storage.path.replace(self.scheme(), '', 1) @@ -801,6 +826,8 @@ def build_mount_command(self, params): mount_options = self.append_timeout_options(mount_options) if mount_options: command += ' -o {}'.format(mount_options) + if self.logging_level == 'DEBUG': + command += ' -v' command += ' {path} {mount}'.format(**params) if PermissionHelper.is_storage_writable(self.storage): command += ' && chmod {permission} {mount}'.format(permission=permission, **params) diff --git a/workflows/pipe-common/shell/mount_storages b/workflows/pipe-common/shell/mount_storages index 3092e6087b..1ec73ee651 100644 --- a/workflows/pipe-common/shell/mount_storages +++ b/workflows/pipe-common/shell/mount_storages @@ -49,7 +49,7 @@ if [ -z "$MOUNT_TASK_NAME" ] exit 1 fi -MOUNT_RESULT=$(eval "$CP_PYTHON2_PATH ${COMMON_REPO_DIR}/scripts/mount_storage.py --mount-root ${MOUNT_ROOT} --tmp-dir ${TMP_DIR} --task ${MOUNT_TASK_NAME}") +"$CP_PYTHON2_PATH" "${COMMON_REPO_DIR}/scripts/mount_storage.py" --mount-root "${MOUNT_ROOT}" --tmp-dir "${TMP_DIR}" --task "${MOUNT_TASK_NAME}" if [ $? -ne 0 ] then