Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 62 additions & 16 deletions user_tools/src/spark_rapids_pytools/common/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,20 @@ def find_full_rapids_tools_env_key(cls, actual_key: str) -> str:
def get_sys_env_var(cls, k: str, def_val=None) -> Optional[str]:
return os.environ.get(k, def_val)

@classmethod
def get_rapids_tools_env(cls, k: str, def_val=None):
val = cls.get_sys_env_var(cls.find_full_rapids_tools_env_key(k), def_val)
return val

@classmethod
def set_rapids_tools_env(cls, k: str, val):
os.environ[cls.find_full_rapids_tools_env_key(k)] = str(val)

@classmethod
def get_or_set_rapids_tools_env(cls, k: str, default_val=None) -> Optional[str]:
full_key = cls.find_full_rapids_tools_env_key(k)
current_val = cls.get_sys_env_var(full_key, None)
if current_val is None or (isinstance(current_val, str) and current_val == ''):
if default_val is not None:
cls.set_rapids_tools_env(k, default_val)
return str(default_val)
return current_val

@classmethod
def gen_str_header(cls, title: str, ruler='-', line_width: int = 40) -> str:
dash = ruler * line_width
Expand Down Expand Up @@ -201,15 +206,22 @@ def get_log_dict(cls, args):
'disable_existing_loggers': False,
'formatters': {
'simple': {
'format': '{asctime} {levelname} {name}: {message}',
'format': '{asctime} {levelname} {run_id_tag} {name}: {message}',
'style': '{',
'datefmt': '%H:%M:%S',
},
},
'filters': {
'run_id': {
'()': 'spark_rapids_pytools.common.utilities.RunIdContextFilter'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'simple',
'level': 'DEBUG' if args.get('debug') else 'ERROR',
'filters': ['run_id']
},
},
'root': {
Expand All @@ -224,25 +236,47 @@ def enable_debug_mode(cls):

@classmethod
def is_debug_mode_enabled(cls):
return Utils.get_rapids_tools_env('LOG_DEBUG')
return Utils.get_or_set_rapids_tools_env('LOG_DEBUG')

@classmethod
def get_and_setup_logger(cls, type_label: str, debug_mode: bool = False):
debug_enabled = bool(Utils.get_rapids_tools_env('LOG_DEBUG', debug_mode))
debug_enabled = bool(Utils.get_or_set_rapids_tools_env('LOG_DEBUG', debug_mode))

cls._ensure_configured(debug_enabled)

logger = logging.getLogger(type_label)
log_file = Utils.get_rapids_tools_env('LOG_FILE')
# Ensure multiple handlers are not added
if log_file and not logger.handlers:
fh = logging.FileHandler(log_file)
fh.setLevel(logging.DEBUG)
formatter = logging.Formatter('{asctime} {levelname} {name}: {message}', style='{')
fh.setFormatter(formatter)
logger.addHandler(fh)

# ToolLogging is a module level class
# For multiple instances of another class(Profiling/Qualification),
# the logger corresponding to that is registered only once.
# So any new/updated FileHandler are not updated. Hence, we need to
# rebind the FileHandler every time we get a logger instance for a type_label
cls._rebind_file_handler(logger, Utils.get_or_set_rapids_tools_env('LOG_FILE'))
return logger

@classmethod
def _rebind_file_handler(cls, logger: logging.Logger, log_file: str) -> None:
# Remove existing FileHandlers to avoid stale paths and duplicates
# Stale paths can occur if LOG_FILE env var changes between calls
# or if multiple instances of Profiling/Qualification are created.
for handler in list(logger.handlers):
if isinstance(handler, logging.FileHandler):
logger.removeHandler(handler)
handler.close()
# Attach a FileHandler if LOG_FILE is set
if not log_file:
return
fh = logging.FileHandler(log_file)
fh.setLevel(logging.DEBUG)
fh.addFilter(RunIdContextFilter())
formatter = logging.Formatter(
'{asctime} {levelname} {run_id_tag} {name}: {message}',
style='{',
datefmt='%Y-%m-%d %H:%M:%S'
)
fh.setFormatter(formatter)
logger.addHandler(fh)

@classmethod
def modify_log4j_properties(cls, prop_file_path: str, new_log_file: str) -> str:
"""
Expand Down Expand Up @@ -274,6 +308,18 @@ def modify_log4j_properties(cls, prop_file_path: str, new_log_file: str) -> str:
return temp_file.name


class RunIdContextFilter(logging.Filter): # pylint: disable=too-few-public-methods
"""
Pulls RUN_ID from environment; if absent, the tag is omitted entirely.
"""

def filter(self, record: logging.LogRecord) -> bool:
run_id = Utils.get_or_set_rapids_tools_env('RUN_ID')
tag = f' [{run_id}]' if run_id else ''
setattr(record, 'run_id_tag', tag)
return True


class TemplateGenerator:
"""A class to manage templates and content generation"""

Expand Down
4 changes: 2 additions & 2 deletions user_tools/src/spark_rapids_pytools/pricing/price_provider.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,7 +61,7 @@ def _generate_cache_files(self):

def __post_init__(self):
self.logger = ToolLogging.get_and_setup_logger(f'rapids.tools.price.{self.name}')
self.cache_directory = Utils.get_rapids_tools_env('CACHE_FOLDER')
self.cache_directory = Utils.get_or_set_rapids_tools_env('CACHE_FOLDER')
self._process_configs()
self._init_catalogs()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2024-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,7 +53,7 @@ def _init_ctxt(self) -> None:
def _process_output_args(self) -> None:
self.logger.debug('Processing Output Arguments')
if self.output_folder is None:
self.output_folder = Utils.get_rapids_tools_env('OUTPUT_DIRECTORY', os.getcwd())
self.output_folder = Utils.get_or_set_rapids_tools_env('OUTPUT_DIRECTORY', os.getcwd())
# make sure that output_folder is being absolute
self.output_folder = FSUtil.get_abs_path(self.output_folder)
FSUtil.make_dirs(self.output_folder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,10 @@ def _process_output_args(self) -> None:
self.logger.debug('Processing Output Arguments')
if self.output_folder is None:
self.output_folder = os.getcwd()
self.output_folder = FSUtil.get_abs_path(self.output_folder)
exec_dir_name = f'{self.name}_{self.ctxt.uuid}'
# It should never happen that the exec_dir_name exists
self.output_folder = FSUtil.build_path(self.output_folder, exec_dir_name)
FSUtil.make_dirs(self.output_folder, exist_ok=False)
self.ctxt.set_local('outputFolder', self.output_folder)
parent_dir = FSUtil.get_abs_path(self.output_folder)
# Use ToolContext; it ensures RUN_ID consistency safely
self.ctxt.set_local_directories(parent_dir)
self.output_folder = self.ctxt.get_output_folder()
self.logger.info('Local output folder is set as: %s', self.output_folder)
# Add QualCoreHandler to the context
self.ctxt.set_ctxt('coreHandler', QualCore(self.qual_output))
Expand Down
4 changes: 3 additions & 1 deletion user_tools/src/spark_rapids_pytools/rapids/rapids_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ def _build_jvm_args(self):
rapids_output_folder = self.exec_ctxt.get_rapids_output_folder()
log4j_file_name = self.exec_ctxt.get_log4j_properties_file()
jvm_arg = ToolLogging.modify_log4j_properties(
jvm_arg, f'{rapids_output_folder}/{log4j_file_name}')
jvm_arg,
f'{rapids_output_folder}/{log4j_file_name}'
)
self.exec_ctxt.set_local('tmp_log4j', jvm_arg)
val = f'-{jvm_k}={jvm_arg}'
else:
Expand Down
6 changes: 5 additions & 1 deletion user_tools/src/spark_rapids_pytools/rapids/rapids_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def _process_output_args(self):
self.logger.debug('Processing Output Arguments')
# make sure output_folder is absolute
if self.output_folder is None:
self.output_folder = Utils.get_rapids_tools_env('OUTPUT_DIRECTORY', os.getcwd())
self.output_folder = Utils.get_or_set_rapids_tools_env('OUTPUT_DIRECTORY', os.getcwd())
try:
output_folder_path = LocalPath(self.output_folder)
self.output_folder = output_folder_path.no_scheme
Expand Down Expand Up @@ -235,6 +235,10 @@ def cleanup_run(self) -> None:
# Ignore the exception here because this might be called toward the end/failure
# and we do want to avoid nested exceptions.
self.logger.debug('Failed to cleanup run')
finally:
# Clear RUN_ID to avoid leaking across runs in same process (single-run assumption)
env_key = Utils.find_full_rapids_tools_env_key('RUN_ID')
os.environ.pop(env_key, None)

def _delete_local_dep_folder(self):
# clean_up the local dependency folder
Expand Down
27 changes: 23 additions & 4 deletions user_tools/src/spark_rapids_pytools/rapids/tool_ctxt.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,28 @@ def __connect_to_platform(self):
self.platform = self.platform_cls(ctxt_args=self.platform_opts)

def __create_and_set_uuid(self):
# The common sessionUuid is currently only set in ProfilingCore
# and QualCore. Need to have the RUN_ID alignment for
# all tools that use ToolContext
if self.platform_opts.get('sessionUuid'):
self.uuid = self.platform_opts['sessionUuid']
else:
self.uuid = Utils.gen_uuid_with_ts(suffix_len=8)
return
# If RUN_ID is provided (in init_environment), align uuid with it
# RUN_ID is expected to be in the format <name>_<time>_<unique_id>
# Safe access is needed in case of non-cli based context access that
# do not trigger init_environment
run_id = Utils.get_or_set_rapids_tools_env('RUN_ID')
if isinstance(run_id, str) and run_id:
parts = run_id.split('_')
if len(parts) >= 3:
self.uuid = '_'.join(parts[-2:])
return
# Default behavior
self.uuid = Utils.gen_uuid_with_ts(suffix_len=8)

def __create_and_set_cache_folder(self):
# get the cache folder from environment variables or set it to default
cache_folder = Utils.get_rapids_tools_env('CACHE_FOLDER', '/var/tmp/spark_rapids_user_tools_cache')
cache_folder = Utils.get_or_set_rapids_tools_env('CACHE_FOLDER', '/var/tmp/spark_rapids_user_tools_cache')
# make sure the environment is set
Utils.set_rapids_tools_env('CACHE_FOLDER', cache_folder)
FSUtil.make_dirs(cache_folder)
Expand Down Expand Up @@ -154,7 +168,12 @@ def set_local_directories(self, output_parent_folder: str) -> None:
:param output_parent_folder: the directory where the local output is going to be created.
"""
short_name = self.get_value('platform', 'shortName')
exec_dir_name = f'{short_name}_{self.uuid}'
# If RUN_ID is provided, use it verbatim to ensure exact match with logging RUN_ID
run_id = Utils.get_or_set_rapids_tools_env('RUN_ID')
exec_dir_name = run_id if run_id else f'{short_name}_{self.uuid}'
# Ensure RUN_ID is set when absent (non-CLI usage); unify logs and folder names
if not run_id:
Utils.set_rapids_tools_env('RUN_ID', exec_dir_name)
self.set_ctxt('execFullName', exec_dir_name)
# create the local dependency folder
self._set_local_dep_dir()
Expand Down
24 changes: 18 additions & 6 deletions user_tools/src/spark_rapids_tools/utils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ def stringify_path(fpath) -> str:
return os.path.abspath(expanded_path)


def resolve_and_prepare_log_file(tools_home_dir: str):
run_id = Utils.get_or_set_rapids_tools_env('RUN_ID')
log_dir = f'{tools_home_dir}/logs'
log_file = f'{log_dir}/{run_id}.log'
Utils.set_rapids_tools_env('LOG_FILE', log_file)
FSUtil.make_dirs(log_dir)
return log_file


def is_http_file(value: Any) -> bool:
try:
TypeAdapter(AnyHttpUrl).validate_python(value)
Expand Down Expand Up @@ -195,14 +204,17 @@ def init_environment(short_name: str) -> str:
tools_home_dir = FSUtil.build_path(home_dir, '.spark_rapids_tools')
Utils.set_rapids_tools_env('HOME', tools_home_dir)

# Set the 'LOG_FILE' environment variable and create the log directory.
log_dir = f'{tools_home_dir}/logs'
log_file = f'{log_dir}/{short_name}_{uuid}.log'
Utils.set_rapids_tools_env('LOG_FILE', log_file)
FSUtil.make_dirs(log_dir)
# 'RUN_ID' is used to create a common ID across a tools execution.
# This ID unifies -
# * Appended to loggers for adding meta information
# * For creating the output_directory with the same ID
# * For creating local dependency work folders
# * For creating the log file with the same ID
Utils.set_rapids_tools_env('RUN_ID', f'{short_name}_{uuid}')

# Print the log file location
log_file = resolve_and_prepare_log_file(tools_home_dir)
print(Utils.gen_report_sec_header('Application Logs'))
print(f"Run ID : {Utils.get_or_set_rapids_tools_env('RUN_ID')}")
print(f'Location: {log_file}')
print('In case of any errors, please share the log file with the Spark RAPIDS team.\n')

Expand Down