diff --git a/user_tools/src/spark_rapids_pytools/common/utilities.py b/user_tools/src/spark_rapids_pytools/common/utilities.py index 5fbfadd2b..100e77d3e 100644 --- a/user_tools/src/spark_rapids_pytools/common/utilities.py +++ b/user_tools/src/spark_rapids_pytools/common/utilities.py @@ -108,15 +108,20 @@ def find_full_rapids_tools_env_key(cls, actual_key: str) -> str: def get_sys_env_var(cls, k: str, def_val=None) -> Optional[str]: return os.environ.get(k, def_val) - @classmethod - def get_rapids_tools_env(cls, k: str, def_val=None): - val = cls.get_sys_env_var(cls.find_full_rapids_tools_env_key(k), def_val) - return val - @classmethod def set_rapids_tools_env(cls, k: str, val): os.environ[cls.find_full_rapids_tools_env_key(k)] = str(val) + @classmethod + def get_or_set_rapids_tools_env(cls, k: str, default_val=None) -> Optional[str]: + full_key = cls.find_full_rapids_tools_env_key(k) + current_val = cls.get_sys_env_var(full_key, None) + if current_val is None or (isinstance(current_val, str) and current_val == ''): + if default_val is not None: + cls.set_rapids_tools_env(k, default_val) + return str(default_val) + return current_val + @classmethod def gen_str_header(cls, title: str, ruler='-', line_width: int = 40) -> str: dash = ruler * line_width @@ -201,15 +206,22 @@ def get_log_dict(cls, args): 'disable_existing_loggers': False, 'formatters': { 'simple': { - 'format': '{asctime} {levelname} {name}: {message}', + 'format': '{asctime} {levelname} {run_id_tag} {name}: {message}', 'style': '{', + 'datefmt': '%H:%M:%S', }, }, + 'filters': { + 'run_id': { + '()': 'spark_rapids_pytools.common.utilities.RunIdContextFilter' + } + }, 'handlers': { 'console': { 'class': 'logging.StreamHandler', 'formatter': 'simple', 'level': 'DEBUG' if args.get('debug') else 'ERROR', + 'filters': ['run_id'] }, }, 'root': { @@ -224,25 +236,47 @@ def enable_debug_mode(cls): @classmethod def is_debug_mode_enabled(cls): - return Utils.get_rapids_tools_env('LOG_DEBUG') + return Utils.get_or_set_rapids_tools_env('LOG_DEBUG') @classmethod def get_and_setup_logger(cls, type_label: str, debug_mode: bool = False): - debug_enabled = bool(Utils.get_rapids_tools_env('LOG_DEBUG', debug_mode)) + debug_enabled = bool(Utils.get_or_set_rapids_tools_env('LOG_DEBUG', debug_mode)) cls._ensure_configured(debug_enabled) logger = logging.getLogger(type_label) - log_file = Utils.get_rapids_tools_env('LOG_FILE') - # Ensure multiple handlers are not added - if log_file and not logger.handlers: - fh = logging.FileHandler(log_file) - fh.setLevel(logging.DEBUG) - formatter = logging.Formatter('{asctime} {levelname} {name}: {message}', style='{') - fh.setFormatter(formatter) - logger.addHandler(fh) + + # ToolLogging is a module level class + # For multiple instances of another class(Profiling/Qualification), + # the logger corresponding to that is registered only once. + # So any new/updated FileHandler are not updated. Hence, we need to + # rebind the FileHandler every time we get a logger instance for a type_label + cls._rebind_file_handler(logger, Utils.get_or_set_rapids_tools_env('LOG_FILE')) return logger + @classmethod + def _rebind_file_handler(cls, logger: logging.Logger, log_file: str) -> None: + # Remove existing FileHandlers to avoid stale paths and duplicates + # Stale paths can occur if LOG_FILE env var changes between calls + # or if multiple instances of Profiling/Qualification are created. + for handler in list(logger.handlers): + if isinstance(handler, logging.FileHandler): + logger.removeHandler(handler) + handler.close() + # Attach a FileHandler if LOG_FILE is set + if not log_file: + return + fh = logging.FileHandler(log_file) + fh.setLevel(logging.DEBUG) + fh.addFilter(RunIdContextFilter()) + formatter = logging.Formatter( + '{asctime} {levelname} {run_id_tag} {name}: {message}', + style='{', + datefmt='%Y-%m-%d %H:%M:%S' + ) + fh.setFormatter(formatter) + logger.addHandler(fh) + @classmethod def modify_log4j_properties(cls, prop_file_path: str, new_log_file: str) -> str: """ @@ -274,6 +308,18 @@ def modify_log4j_properties(cls, prop_file_path: str, new_log_file: str) -> str: return temp_file.name +class RunIdContextFilter(logging.Filter): # pylint: disable=too-few-public-methods + """ + Pulls RUN_ID from environment; if absent, the tag is omitted entirely. + """ + + def filter(self, record: logging.LogRecord) -> bool: + run_id = Utils.get_or_set_rapids_tools_env('RUN_ID') + tag = f' [{run_id}]' if run_id else '' + setattr(record, 'run_id_tag', tag) + return True + + class TemplateGenerator: """A class to manage templates and content generation""" diff --git a/user_tools/src/spark_rapids_pytools/pricing/price_provider.py b/user_tools/src/spark_rapids_pytools/pricing/price_provider.py index 90a556e3b..ceb4da09f 100644 --- a/user_tools/src/spark_rapids_pytools/pricing/price_provider.py +++ b/user_tools/src/spark_rapids_pytools/pricing/price_provider.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# Copyright (c) 2023-2025, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -61,7 +61,7 @@ def _generate_cache_files(self): def __post_init__(self): self.logger = ToolLogging.get_and_setup_logger(f'rapids.tools.price.{self.name}') - self.cache_directory = Utils.get_rapids_tools_env('CACHE_FOLDER') + self.cache_directory = Utils.get_or_set_rapids_tools_env('CACHE_FOLDER') self._process_configs() self._init_catalogs() diff --git a/user_tools/src/spark_rapids_pytools/rapids/dev/instance_description.py b/user_tools/src/spark_rapids_pytools/rapids/dev/instance_description.py index 6284375a8..5f8780772 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/dev/instance_description.py +++ b/user_tools/src/spark_rapids_pytools/rapids/dev/instance_description.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -53,7 +53,7 @@ def _init_ctxt(self) -> None: def _process_output_args(self) -> None: self.logger.debug('Processing Output Arguments') if self.output_folder is None: - self.output_folder = Utils.get_rapids_tools_env('OUTPUT_DIRECTORY', os.getcwd()) + self.output_folder = Utils.get_or_set_rapids_tools_env('OUTPUT_DIRECTORY', os.getcwd()) # make sure that output_folder is being absolute self.output_folder = FSUtil.get_abs_path(self.output_folder) FSUtil.make_dirs(self.output_folder) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification_stats.py b/user_tools/src/spark_rapids_pytools/rapids/qualification_stats.py index a03c0d442..89aa8962f 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification_stats.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification_stats.py @@ -66,12 +66,10 @@ def _process_output_args(self) -> None: self.logger.debug('Processing Output Arguments') if self.output_folder is None: self.output_folder = os.getcwd() - self.output_folder = FSUtil.get_abs_path(self.output_folder) - exec_dir_name = f'{self.name}_{self.ctxt.uuid}' - # It should never happen that the exec_dir_name exists - self.output_folder = FSUtil.build_path(self.output_folder, exec_dir_name) - FSUtil.make_dirs(self.output_folder, exist_ok=False) - self.ctxt.set_local('outputFolder', self.output_folder) + parent_dir = FSUtil.get_abs_path(self.output_folder) + # Use ToolContext; it ensures RUN_ID consistency safely + self.ctxt.set_local_directories(parent_dir) + self.output_folder = self.ctxt.get_output_folder() self.logger.info('Local output folder is set as: %s', self.output_folder) # Add QualCoreHandler to the context self.ctxt.set_ctxt('coreHandler', QualCore(self.qual_output)) diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py index 0660b5215..d59a0b844 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_job.py @@ -195,7 +195,9 @@ def _build_jvm_args(self): rapids_output_folder = self.exec_ctxt.get_rapids_output_folder() log4j_file_name = self.exec_ctxt.get_log4j_properties_file() jvm_arg = ToolLogging.modify_log4j_properties( - jvm_arg, f'{rapids_output_folder}/{log4j_file_name}') + jvm_arg, + f'{rapids_output_folder}/{log4j_file_name}' + ) self.exec_ctxt.set_local('tmp_log4j', jvm_arg) val = f'-{jvm_k}={jvm_arg}' else: diff --git a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py index ddfbde502..3c00b67f3 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py +++ b/user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py @@ -160,7 +160,7 @@ def _process_output_args(self): self.logger.debug('Processing Output Arguments') # make sure output_folder is absolute if self.output_folder is None: - self.output_folder = Utils.get_rapids_tools_env('OUTPUT_DIRECTORY', os.getcwd()) + self.output_folder = Utils.get_or_set_rapids_tools_env('OUTPUT_DIRECTORY', os.getcwd()) try: output_folder_path = LocalPath(self.output_folder) self.output_folder = output_folder_path.no_scheme @@ -235,6 +235,10 @@ def cleanup_run(self) -> None: # Ignore the exception here because this might be called toward the end/failure # and we do want to avoid nested exceptions. self.logger.debug('Failed to cleanup run') + finally: + # Clear RUN_ID to avoid leaking across runs in same process (single-run assumption) + env_key = Utils.find_full_rapids_tools_env_key('RUN_ID') + os.environ.pop(env_key, None) def _delete_local_dep_folder(self): # clean_up the local dependency folder diff --git a/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py b/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py index 7a89691f3..8d2a9a6ad 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py +++ b/user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py @@ -58,14 +58,28 @@ def __connect_to_platform(self): self.platform = self.platform_cls(ctxt_args=self.platform_opts) def __create_and_set_uuid(self): + # The common sessionUuid is currently only set in ProfilingCore + # and QualCore. Need to have the RUN_ID alignment for + # all tools that use ToolContext if self.platform_opts.get('sessionUuid'): self.uuid = self.platform_opts['sessionUuid'] - else: - self.uuid = Utils.gen_uuid_with_ts(suffix_len=8) + return + # If RUN_ID is provided (in init_environment), align uuid with it + # RUN_ID is expected to be in the format _