Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
from spark_rapids_pytools.rapids.qualification_core import QualificationCore
from spark_rapids_tools.api_v1.builder import APIResultHandler
from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel, SubmissionMode
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
from spark_rapids_tools.tools.core.qual_handler import QualCoreHandler
from spark_rapids_tools.tools.qualx.qualx_main import predict
from spark_rapids_tools.tools.qualification_stats_report import SparkQualificationStats
from spark_rapids_tools.tools.qualx.revamp.x_main import predict_x
from spark_rapids_tools.tools.speedup_category import SpeedupCategory
from spark_rapids_tools.tools.top_candidates import TopCandidates
from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration
Expand Down Expand Up @@ -537,12 +539,20 @@ def __update_apps_with_prediction_info(self,
model_name = self.ctxt.platform.get_prediction_model_name()
qual_output_dir = self.ctxt.get_csp_output_path()
output_info = self.__build_prediction_output_files_info()
qual_handler = self.ctxt.get_ctxt('qualHandler')
try:
predictions_df = predict(platform=model_name, qual=qual_output_dir,
output_info=output_info,
model=estimation_model_args['customModelFile'],
qual_handlers=[qual_handler])
# Build the QualCore handler object to handle the prediction model output
q_core_handler = APIResultHandler().qual_core().with_path(qual_output_dir).build()
if Utils.get_rapids_tools_env('QUALX_REVAMP'):
predictions_df = predict_x(platform=model_name,
qual=qual_output_dir,
output_info=output_info,
model=estimation_model_args['customModelFile'],
qual_handlers=[q_core_handler])
else:
predictions_df = predict(platform=model_name, qual=qual_output_dir,
output_info=output_info,
model=estimation_model_args['customModelFile'],
qual_handlers=[q_core_handler])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to eventually push the branching down into the core qualx APIs, just so all invocations to predict() can use the switch, but this is fine (and easier) for now.

except Exception as e: # pylint: disable=broad-except
predictions_df = pd.DataFrame()
self.logger.error(
Expand Down
22 changes: 7 additions & 15 deletions user_tools/src/spark_rapids_pytools/rapids/qualx/prediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,25 @@

from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.rapids.qualx.qualx_tool import QualXTool
from spark_rapids_tools.tools.core.qual_handler import QualCoreHandler
from spark_rapids_tools.api_v1 import QualCoreResultHandler
from spark_rapids_tools.api_v1.builder import APIResultHandler
from spark_rapids_tools.tools.qualx.qualx_main import predict
from spark_rapids_tools.tools.qualx.util import print_summary, print_speedup_summary


@dataclass
class Prediction(QualXTool):
"""
Wrapper layer around Prediction Tool.

Attributes
----------
qual_output: str
Path to a directory containing qualification tool output.
qual_handler: QualCoreHandler
Handler for reading qualification core tool results.
A wrapper to run the QualX prediction stage on an existing Qual's output.
:param qual_output: Path to the directory containing the qualification tool output.
"""
qual_output: str = None
qual_handler: QualCoreHandler = None

name = 'prediction'

def __post_init__(self):
"""Initialize the QualCoreHandler from qual_output."""
super().__post_init__()
if self.qual_output is not None:
self.qual_handler = QualCoreHandler(result_path=self.qual_output)
@property
def qual_handler(self) -> QualCoreResultHandler:
return APIResultHandler().qual_core().with_path(self.qual_output).build()

def __prepare_prediction_output_info(self) -> dict:
"""
Expand Down
12 changes: 11 additions & 1 deletion user_tools/src/spark_rapids_tools/api_v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
QualCoreResultHandler,
ProfCoreResultHandler
)
from .builder import (
LoadRawFilesResult,
APIUtils,
CSVReportCombiner,
CSVReport
)

__all__ = [
'AppHandler',
Expand All @@ -42,5 +48,9 @@
'QualWrapperResultHandler',
'QualCoreResultHandler',
'ProfWrapperResultHandler',
'ProfCoreResultHandler'
'ProfCoreResultHandler',
'LoadRawFilesResult',
'APIUtils',
'CSVReportCombiner',
'CSVReport'
]
154 changes: 148 additions & 6 deletions user_tools/src/spark_rapids_tools/api_v1/app_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
# limitations under the License.

"""module that defines the app descriptor for the results loaded by the tools."""

import re
from dataclasses import dataclass, field
from functools import cached_property
from typing import Optional
from typing import Optional, List, Dict

import pandas as pd
from pydantic.alias_generators import to_camel

from spark_rapids_tools.utils import Utilities


@dataclass
Expand All @@ -32,6 +35,56 @@ class AppHandler(object):
# this will be loaded from the core-status csv report
eventlog_path: Optional[str] = None

@staticmethod
def get_pd_dtypes() -> Dict[str, str]:
"""
Get the pandas data types for the AppHandler attributes.
:return: Dictionary mapping attribute names to pandas data types.
"""
return {
'app_id': Utilities.scala_to_pandas_type('String'),
'attempt_id': Utilities.scala_to_pandas_type('Int'),
'app_name': Utilities.scala_to_pandas_type('String'),
'eventlog_path': Utilities.scala_to_pandas_type('String')
}

@staticmethod
def normalize_attribute(arg_value: str) -> str:
"""
Normalize the attribute name to a plain format.
It uses re.sub to replace any '-' or '_' with a space using the regexp 'r"(_|-)+"'.
Finally, it uses str.replace() to remove any spaces.
:param arg_value: the attribute name to normalize.
:return: the actual field name that is used in the AppHandler.
"""
processed_value = re.sub(r'([_\-])+', ' ', arg_value.strip().lower()).replace(' ', '')
lookup_map = {
'appname': 'app_name',
'appid': 'app_id',
'attemptid': 'attempt_id',
'eventlogpath': 'eventlog_path'
}
return lookup_map.get(processed_value, arg_value)

@classmethod
def get_key_attributes(cls) -> List[str]:
"""
Get the key attributes that define an AppHandler.
:return: List of key attributes.
"""
return ['app_id']

@classmethod
def get_default_key_columns(cls) -> Dict[str, str]:
"""
Get the default key columns for the AppHandler.
:return: Dictionary mapping attribute names to column names.
"""
res = {}
for attr in cls.get_key_attributes():
res[attr] = to_camel(attr)
return res

def is_name_defined(self) -> bool:
"""
Check if the app name is defined.
Expand All @@ -57,17 +110,37 @@ def uuid(self) -> str:
"""
return self._app_id

def patch_into_df(self, df: pd.DataFrame) -> pd.DataFrame:
def patch_into_df(self,
df: pd.DataFrame,
col_names: Optional[List[str]] = None) -> pd.DataFrame:
"""
Given a dataframe, this method will stitch the app_id and app-name to the dataframe.
This can be useful in automatically adding the app-id/app-name to the data-frame
:param df: the dataframe that we want to modify.
:param col_names: optional list of column names that defines the app_id and app_name to the
dataframe. It is assumed that the list comes in the order it is inserted in
the column names.
:return: the resulting dataframe from adding the columns.
"""
# TODO: We should consider add UUID as well, and use that for the joins instead.
# append attempt_id to support multiple attempts
col_values = [self.app_id]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels weird that col_values is a fixed/default list of one value, while col_names can be a user-provided list with multiple values. Also, I'm not sure how I would patch app_name per the docstring. And what would happen if I passed in col_names=['one', 'two', 'three']? Seems like I'd get three columns with the same (appId) value?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!
I believe that the dev intending to stitch DFs using Apps (default combiner), has to use a column-name that can be mapped to the object. Otherwise, he has to use a custom combiner for example:

  • [appId, app_id, App ID, App Id...] -> all those variations can be reduced to app._app_id field. Otherwise, there won't be a way to tell which col-name will get the value of the app._app_id
  • [app_name, appName, App Name...] ->similarly all can be mapped to app._app_name fields

if col_names is None:
# append attemptId to support multi-attempts
col_names = ['appId']
Copy link
Preview

Copilot AI Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The zip operation will only iterate over min(len(col_names), len(col_values)) items. Since col_values always has length 1 but col_names can have different lengths, this could skip columns or fail silently.

Suggested change
col_names = ['appId']
col_names = ['appId']
# Ensure col_values matches col_names in length
if len(col_values) == 1 and len(col_names) > 1:
col_values = col_values * len(col_names)
elif len(col_values) != len(col_names):
raise ValueError("Length of col_values must be 1 or match length of col_names")

Copilot uses AI. Check for mistakes.

# Ensure col_values matches col_names in length
if len(col_values) == 1 and len(col_names) > 1:
col_values = col_values * len(col_names)
elif len(col_values) != len(col_names):
raise ValueError('Length of col_values must be 1 or match length of col_names')
if not df.empty:
# TODO: We should consider add UUID as well, and use that for the joins instead.
df.insert(0, 'attemptId', self._attempt_id)
df.insert(0, 'appId', self._app_id)
for col_k, col_v in zip(reversed(col_names), reversed(col_values)):
if col_k not in df.columns:
df.insert(0, col_k, col_v)
else:
# if the column already exists, we should not overwrite it
# this is useful when we want to patch the app_id/app_name to an existing dataframe
df[col_k] = col_v
return df

@property
Expand Down Expand Up @@ -121,3 +194,72 @@ def merge(self, other: 'AppHandler') -> 'AppHandler':
if self.eventlog_path is None and other.eventlog_path is not None:
self.eventlog_path = other.eventlog_path
return self

################################
# Public Methods
################################

def convert_to_df(self) -> pd.DataFrame:
"""
Convert the AppHandler attributes to a DataFrame.
:return: DataFrame with app_id, app_name, and attempt_id as columns.
"""
data = {
'app_id': [self.app_id],
'attempt_id': [self.attempt_id],
'app_name': [self.app_name],
'eventlog_path': [self.eventlog_path]
}
data_types = AppHandler.get_pd_dtypes()
return pd.DataFrame({
col: pd.Series(data[col], dtype=dtype) for col, dtype in data_types.items()
})

def add_fields_to_dataframe(self,
df: pd.DataFrame,
field_to_col_map: Dict[str, str]) -> pd.DataFrame:
"""
Insert fields/properties from AppHandler into the DataFrame, with user-specified column names.
:param df: Existing DataFrame to append to.
:type df: pd.DataFrame
:param field_to_col_map: Dictionary mapping AppHandler attributes (keys) to DataFrame column names (values).
:type field_to_col_map: Dict[str, str]
default: Value to use if attribute/property not found (raises if None).
"""
converted_df = self.convert_to_df()
row_data = []
for attr, col in field_to_col_map.items():
# Normalize the attribute name
norm_attr = AppHandler.normalize_attribute(attr)
try:
value = getattr(self, norm_attr)
row_data.append((col, norm_attr, value))
except AttributeError as exc:
raise AttributeError(f"Attribute '{attr}' not found in AppHandler.") from exc
for col, norm_attr, value in reversed(row_data):
# Check if the column already exists in the DataFrame
if col in df.columns:
# If it exists, we should not overwrite it, skip
continue
# create a new column with the correct type. We do this because we do not want to
# to add values to an empty dataframe.
df.insert(loc=0, column=col, value=pd.Series(dtype=converted_df[norm_attr].dtype))
# set the values in case the dataframe was non-empty.
df[col] = pd.Series([value] * len(df), dtype=converted_df[norm_attr].dtype)
return df

@classmethod
def inject_into_df(cls,
df: pd.DataFrame,
field_to_col_map: Dict[str, str],
app_h: Optional['AppHandler'] = None) -> pd.DataFrame:
"""
Inject AppHandler fields into a DataFrame using a mapping of field names to column names.
:param df:
:param field_to_col_map:
:param app_h:
:return:
"""
if app_h is None:
app_h = AppHandler(_app_id='UNKNOWN_APP', _app_name='UNKNOWN_APP', _attempt_id=1)
return app_h.add_fields_to_dataframe(df, field_to_col_map)
Loading
Loading