Skip to content

Commit

Permalink
Rename App Execution Engine to Execution Engine
Browse files Browse the repository at this point in the history
Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa committed Nov 2, 2024
1 parent 8921a85 commit bf9d0d0
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 33 deletions.
36 changes: 18 additions & 18 deletions user_tools/src/spark_rapids_pytools/rapids/qualification.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from spark_rapids_pytools.common.sys_storage import FSUtil
from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator
from spark_rapids_pytools.rapids.rapids_tool import RapidsJarTool
from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel, CspEnv, AppExecutionEngine
from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel, CspEnv, ExecutionEngine
from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics
from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender
from spark_rapids_tools.tools.qualx.qualx_main import predict
Expand Down Expand Up @@ -320,7 +320,7 @@ def __build_global_report_summary(self,

# Assign execution engine (Spark/Photon etc.) and speedup categories (Small/Medium/Large) to each application.
# Note: Strategy for speedup categorization will be based on the execution engine of the application.
apps_with_exec_engine_df = self._assign_app_execution_engine(apps_grouped_df)
apps_with_exec_engine_df = self._assign_execution_engine_to_apps(apps_grouped_df)
speedup_category_confs = self.ctxt.get_value('local', 'output', 'speedupCategories')
speedup_category_ob = SpeedupCategory(speedup_category_confs)
df_final_result = speedup_category_ob.build_category_column(apps_with_exec_engine_df)
Expand Down Expand Up @@ -612,7 +612,7 @@ def _read_qualification_output_file(self, report_name_key: str, file_format_key:

def _read_qualification_metric_file(self, file_name: str) -> Dict[str, pd.DataFrame]:
"""
Helper method to read and aggregate metric files from the qualification tool's output metric folder.
Helper method to read metric files from the qualification tool's output metric folder.
Returns a dictionary of DataFrames, where each key is an application ID, and each
DataFrame contains the corresponding application's metrics data.
Example:
Expand All @@ -638,35 +638,35 @@ def _read_qualification_metric_file(self, file_name: str) -> Dict[str, pd.DataFr
app_id_name, type(e).__name__, e)
return metrics

def _assign_app_execution_engine(self, tools_processed_apps: pd.DataFrame) -> pd.DataFrame:
def _assign_execution_engine_to_apps(self, tools_processed_apps: pd.DataFrame) -> pd.DataFrame:
"""
Assigns the application execution engine (Spark/Photon) to each application. This will be used to categorize
applications into speedup categories (Small/Medium/Large) based on the execution engine.
Assigns the execution engine (Spark/Photon) to each application. This will be used to categorize
applications into speedup categories (Small/Medium/Large).
"""
spark_properties = self._read_qualification_metric_file('spark_properties.csv')
default_exec_type = AppExecutionEngine.get_default().value
app_exec_col_name = self.ctxt.get_value('local', 'output', 'speedupCategories', 'appExecTypeColumnName')
default_exec_engine_type = ExecutionEngine.get_default().value
exec_engine_col_name = self.ctxt.get_value('local', 'output', 'speedupCategories', 'execEngineColumnName')

# Default to CPU-based execution type for non-Databricks platforms
# Default to Spark-based execution type for non-Databricks platforms
if self.ctxt.platform.get_platform_name() not in {CspEnv.DATABRICKS_AWS, CspEnv.DATABRICKS_AZURE}:
tools_processed_apps[app_exec_col_name] = default_exec_type
tools_processed_apps[exec_engine_col_name] = default_exec_engine_type
return tools_processed_apps

# Create a map of app IDs to execution types based on spark version
# Create a map of App IDs to their execution engine type (Spark/Photon)
spark_version_key = 'spark.databricks.clusterUsageTags.sparkVersion'
app_exec_engine_map = {}
exec_engine_map = {}

for app_id, props_df in spark_properties.items():
props_dict = Utilities.convert_df_to_dict(props_df)
spark_version = props_dict.get(spark_version_key, '').lower()
if AppExecutionEngine.PHOTON.value.lower() in spark_version:
app_exec_engine_map[app_id] = AppExecutionEngine.PHOTON.value
if ExecutionEngine.PHOTON.value.lower() in spark_version:
exec_engine_map[app_id] = ExecutionEngine.PHOTON.value
else:
app_exec_engine_map[app_id] = default_exec_type
exec_engine_map[app_id] = default_exec_engine_type

# Map the execution type to each application based on app ID
tools_processed_apps[app_exec_col_name] = (
tools_processed_apps['App ID'].map(app_exec_engine_map).fillna(default_exec_type)
# Assign the execution engine type to each application DataFrame row
tools_processed_apps[exec_engine_col_name] = (
tools_processed_apps['App ID'].map(exec_engine_map).fillna(default_exec_engine_type)
)
return tools_processed_apps

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ local:
- 'App Name'
- 'Event Log'
- 'Cluster Info'
- 'App Execution Engine'
- 'Execution Engine'
- 'Estimated GPU Speedup Category'
- 'Full Cluster Config Recommendations*'
- 'GPU Config Recommendation Breakdown*'
Expand Down Expand Up @@ -256,7 +256,7 @@ local:
speedupColumnName: 'Estimated GPU Speedup'
categoryColumnName: 'Estimated GPU Speedup Category'
heuristicsColumnName: 'Skip by Heuristics'
appExecTypeColumnName: 'App Execution Engine'
execEngineColumnName: 'Execution Engine'
defaultCategory: 'Not Recommended'
strategies:
spark: # Spark specific speedup categories
Expand Down
4 changes: 2 additions & 2 deletions user_tools/src/spark_rapids_tools/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,13 @@ def create_default_model_args(cls, model_type: str) -> dict:
}


class AppExecutionEngine(EnumeratedType):
class ExecutionEngine(EnumeratedType):
"""
Represents the execution engine for the application (Spark or Photon).
"""
SPARK = 'spark'
PHOTON = 'photon'

@classmethod
def get_default(cls) -> 'AppExecutionEngine':
def get_default(cls) -> 'ExecutionEngine':
return cls.SPARK
20 changes: 11 additions & 9 deletions user_tools/src/spark_rapids_tools/tools/speedup_category.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ class SpeedupCategory:

def __post_init__(self):
strategy_properties = self.props.get('strategies', {})
# Create a SpeedupStrategy for each application execution type.
for app_exec_engine, properties in strategy_properties.items(): # type: str, dict
self.speedup_strategies[app_exec_engine] = SpeedupStrategy(properties)
# Create a SpeedupStrategy for each execution engine type.
for exec_engine, properties in strategy_properties.items(): # type: str, dict
self.speedup_strategies[exec_engine] = SpeedupStrategy(properties)

def _build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame:
"""
Expand All @@ -71,12 +71,13 @@ def _build_category_column(self, all_apps: pd.DataFrame) -> pd.DataFrame:
"""
category_col_name = self.props.get('categoryColumnName')
speedup_col_name = self.props.get('speedupColumnName')
app_exec_col_name = self.props.get('appExecTypeColumnName')
exec_engine_col_name = self.props.get('execEngineColumnName')

# Calculate the category based on the speedup value
def calculate_category(single_row: pd.Series) -> Optional[str]:
app_exec_engine = single_row.get(app_exec_col_name)
categories = self.speedup_strategies.get(app_exec_engine).get_categories()
exec_engine = single_row.get(exec_engine_col_name)
# Get the speedup strategy and its categories for the given execution engine type.
categories = self.speedup_strategies.get(exec_engine).get_categories()
col_value = single_row.get(speedup_col_name)
for category in categories:
if category.get('lowerBound') <= col_value < category.get('upperBound'):
Expand Down Expand Up @@ -104,11 +105,12 @@ def _process_category(self, all_apps: pd.DataFrame) -> pd.DataFrame:
"""
category_col_name = self.props.get('categoryColumnName')
heuristics_col_name = self.props.get('heuristicsColumnName')
app_exec_col_name = self.props.get('appExecTypeColumnName')
exec_engine_col_name = self.props.get('execEngineColumnName')

def process_row(single_row: pd.Series) -> str:
app_exec_engine = single_row.get(app_exec_col_name)
eligibility_conditions = self.speedup_strategies.get(app_exec_engine).get_eligibility_conditions()
exec_engine = single_row.get(exec_engine_col_name)
# Get the speedup strategy and its eligibility conditions for the given execution engine type.
eligibility_conditions = self.speedup_strategies.get(exec_engine).get_eligibility_conditions()
for entry in eligibility_conditions:
col_value = single_row[entry.get('columnName')]
# If the row is marked to be skipped by heuristics or the value is not within the range,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def get_tools_root_path() -> str:
@staticmethod
def get_tools_output_dir(log_str: str) -> Optional[str]:
"""
Extracts the output directory path from the given log string.
Extracts the output directory path from the given log string (e.g. /path/to/qual_2024xxx)
:param log_str: Log string containing the output directory path.
:return: Directory path if found, otherwise None.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def step_verify_metadata_file(context, metadata_file: str, execution_engines: st
)
# Verify that each app's execution engine matches the expected value
for metadata in app_metadata:
actual_engine = metadata.get('appExecutionEngine')
actual_engine = metadata.get('executionEngine')
event_log_path = urlparse(metadata.get('eventLog')).path
expected_engine = exec_engine_map.get(event_log_path)
assert actual_engine == expected_engine, (
Expand Down

0 comments on commit bf9d0d0

Please sign in to comment.