Skip to content

Commit fa5a565

Browse files
committed
Fix formatting
1 parent afa7ccf commit fa5a565

File tree

1 file changed

+119
-86
lines changed

1 file changed

+119
-86
lines changed

training/components.py

+119-86
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88
from utils.consts import PYTHON_IMAGE, RHELAI_IMAGE, TOOLBOX_IMAGE
99

1010

11-
@dsl.component(base_image=RHELAI_IMAGE,
12-
install_kfp_package=False,
13-
)
11+
@dsl.component(
12+
base_image=RHELAI_IMAGE,
13+
install_kfp_package=False,
14+
)
1415
def data_processing_op(
1516
model_path: str = "/model",
1617
sdg_path: str = "/data/sdg",
@@ -121,6 +122,7 @@ def knowledge_processed_data_to_artifact_op(
121122
[f"cp -r {pvc_path} {knowledge_processed_data.path}"],
122123
)
123124

125+
124126
# Change base image to the RHOAI python image with kubeflow_training once available
125127
@dsl.component(base_image="quay.io/redhat-et/ilab:shrey", install_kfp_package=False)
126128
def pytorch_job_launcher_op(
@@ -149,14 +151,14 @@ def pytorch_job_launcher_op(
149151
import os
150152

151153
def list_phase1_final_model():
152-
model_dir = "/output/phase_1/model/hf_format"
153-
ilab_models = os.listdir(model_dir)
154-
newest_idx = max(
155-
(os.path.getmtime(f"{model_dir}/{model}"), i)
156-
for i, model in enumerate(ilab_models)
157-
)[-1]
158-
newest_model = ilab_models[newest_idx]
159-
return f"{model_dir}/{newest_model}"
154+
model_dir = "/output/phase_1/model/hf_format"
155+
ilab_models = os.listdir(model_dir)
156+
newest_idx = max(
157+
(os.path.getmtime(f"{model_dir}/{model}"), i)
158+
for i, model in enumerate(ilab_models)
159+
)[-1]
160+
newest_model = ilab_models[newest_idx]
161+
return f"{model_dir}/{newest_model}"
160162

161163
if phase_num == 1:
162164
path_to_model = "/input_model"
@@ -167,14 +169,14 @@ def list_phase1_final_model():
167169
else:
168170
raise RuntimeError(f"Unsupported value of {phase_num=}")
169171

170-
resources_per_worker = {"cpu": "8",
171-
"nvidia.com/gpu": nproc_per_node}
172-
172+
resources_per_worker = {"cpu": "8", "nvidia.com/gpu": nproc_per_node}
173+
173174
base_image = "quay.io/redhat-et/ilab:shrey"
174175
name = f"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}"
175176
command = ["/bin/bash", "-c", "--"]
176177

177-
master_args = [f"""echo "Running phase {phase_num}"
178+
master_args = [
179+
f"""echo "Running phase {phase_num}"
178180
echo "Using {path_to_model} model for training"
179181
echo "Using {path_to_data} data for training"
180182
mkdir -p /output/phase_{phase_num}/model;
@@ -200,9 +202,11 @@ def list_phase1_final_model():
200202
--distributed_training_framework fsdp \
201203
--is_granite \
202204
--checkpoint_at_epoch
203-
"""]
204-
205-
worker_args = [f"""echo "Running phase {phase_num}"
205+
"""
206+
]
207+
208+
worker_args = [
209+
f"""echo "Running phase {phase_num}"
206210
echo "Using {path_to_model} model for training"
207211
echo "Using {path_to_data} data for training"
208212
mkdir -p /tmp/model;
@@ -227,69 +231,99 @@ def list_phase1_final_model():
227231
--distributed_training_framework fsdp \
228232
--is_granite \
229233
--checkpoint_at_epoch
230-
"""]
231-
234+
"""
235+
]
236+
232237
# Set volumes and volume mounts
233-
input_data_volume = models.V1Volume(name="input-data",
234-
persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(
235-
claim_name=input_pvc_name))
236-
input_model_volume = models.V1Volume(name="model",
237-
persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(
238-
claim_name=model_pvc_name))
239-
output_volume = models.V1Volume(name="output",
240-
persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(
241-
claim_name=output_pvc_name))
242-
243-
input_data_volume_mount = models.V1VolumeMount(mount_path="/input_data", name="input-data", read_only=True)
244-
input_model_volume_mount = models.V1VolumeMount(mount_path="/input_model", name="model", read_only=True)
245-
output_volume_mount_master = models.V1VolumeMount(mount_path="/output", name="output")
246-
output_volume_mount_worker = models.V1VolumeMount(mount_path="/output", name="output", read_only=True)
238+
input_data_volume = models.V1Volume(
239+
name="input-data",
240+
persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(
241+
claim_name=input_pvc_name
242+
),
243+
)
244+
input_model_volume = models.V1Volume(
245+
name="model",
246+
persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(
247+
claim_name=model_pvc_name
248+
),
249+
)
250+
output_volume = models.V1Volume(
251+
name="output",
252+
persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(
253+
claim_name=output_pvc_name
254+
),
255+
)
256+
257+
input_data_volume_mount = models.V1VolumeMount(
258+
mount_path="/input_data", name="input-data", read_only=True
259+
)
260+
input_model_volume_mount = models.V1VolumeMount(
261+
mount_path="/input_model", name="model", read_only=True
262+
)
263+
output_volume_mount_master = models.V1VolumeMount(
264+
mount_path="/output", name="output"
265+
)
266+
output_volume_mount_worker = models.V1VolumeMount(
267+
mount_path="/output", name="output", read_only=True
268+
)
247269

248270
# Set env variables
249-
nnodes_var = models.V1EnvVar(name="NNODES", value=f"{nnodes}")
250-
nproc_per_node_var = models.V1EnvVar(name="NPROC_PER_NODE", value=f"{nproc_per_node}")
271+
nnodes_var = models.V1EnvVar(name="NNODES", value=f"{nnodes}")
272+
nproc_per_node_var = models.V1EnvVar(
273+
name="NPROC_PER_NODE", value=f"{nproc_per_node}"
274+
)
251275
xdg_cache_var = models.V1EnvVar(name="XDG_CACHE_HOME", value="/tmp")
252276
triton_cache_var = models.V1EnvVar(name="TRITON_CACHE_DIR", value="/tmp")
253277
hf_home_var = models.V1EnvVar(name="HF_HOME", value="/tmp")
254278
transformers_cache_var = models.V1EnvVar(name="TRANSFORMERS_CACHE", value="/tmp")
255279

256-
257280
# Get master and worker container specs
258-
master_container_spec = utils.get_container_spec(base_image=base_image,
259-
name="pytorch",
260-
resources=resources_per_worker,
261-
volume_mounts=[input_data_volume_mount,
262-
input_model_volume_mount,
263-
output_volume_mount_master])
264-
265-
# In the next release of kubeflow-training, the command
266-
# and the args will be a part of utils.get_container_spec function
281+
master_container_spec = utils.get_container_spec(
282+
base_image=base_image,
283+
name="pytorch",
284+
resources=resources_per_worker,
285+
volume_mounts=[
286+
input_data_volume_mount,
287+
input_model_volume_mount,
288+
output_volume_mount_master,
289+
],
290+
)
291+
292+
# In the next release of kubeflow-training, the command
293+
# and the args will be a part of utils.get_container_spec function
267294
master_container_spec.command = command
268295
master_container_spec.args = master_args
269296

270-
271-
master_container_spec.env = [nnodes_var,
272-
nproc_per_node_var,
273-
xdg_cache_var,
274-
triton_cache_var,
275-
hf_home_var,
276-
transformers_cache_var]
277-
278-
worker_container_spec = utils.get_container_spec(base_image=base_image,
279-
name="pytorch",
280-
resources=resources_per_worker,
281-
volume_mounts=[input_data_volume_mount,
282-
input_model_volume_mount,
283-
output_volume_mount_worker])
297+
master_container_spec.env = [
298+
nnodes_var,
299+
nproc_per_node_var,
300+
xdg_cache_var,
301+
triton_cache_var,
302+
hf_home_var,
303+
transformers_cache_var,
304+
]
305+
306+
worker_container_spec = utils.get_container_spec(
307+
base_image=base_image,
308+
name="pytorch",
309+
resources=resources_per_worker,
310+
volume_mounts=[
311+
input_data_volume_mount,
312+
input_model_volume_mount,
313+
output_volume_mount_worker,
314+
],
315+
)
284316
worker_container_spec.command = command
285317
worker_container_spec.args = worker_args
286-
worker_container_spec.env = [nnodes_var,
287-
nproc_per_node_var,
288-
xdg_cache_var,
289-
triton_cache_var,
290-
hf_home_var,
291-
transformers_cache_var]
292-
318+
worker_container_spec.env = [
319+
nnodes_var,
320+
nproc_per_node_var,
321+
xdg_cache_var,
322+
triton_cache_var,
323+
hf_home_var,
324+
transformers_cache_var,
325+
]
326+
293327
# create master pod spec
294328
master_pod_template_spec = utils.get_pod_template_spec(
295329
containers=[master_container_spec],
@@ -303,22 +337,24 @@ def list_phase1_final_model():
303337
)
304338

305339
logging.getLogger(__name__).setLevel(logging.INFO)
306-
logging.info('Generating job template.')
307-
logging.info('Creating TrainingClient.')
340+
logging.info("Generating job template.")
341+
logging.info("Creating TrainingClient.")
308342

309343
# Initialize training client
310344
# This also finds the namespace from /var/run/secrets/kubernetes.io/serviceaccount/namespace
311-
# And it also loads the kube config
345+
# And it also loads the kube config
312346
training_client = TrainingClient()
313347
namespace = training_client.namespace
314348

315349
# Create pytorch job spec
316-
job_template = utils.get_pytorchjob_template(name=name,
317-
namespace=namespace,
318-
worker_pod_template_spec=worker_pod_template_spec,
319-
master_pod_template_spec=master_pod_template_spec,
320-
num_workers=nnodes,
321-
num_procs_per_worker=nproc_per_node)
350+
job_template = utils.get_pytorchjob_template(
351+
name=name,
352+
namespace=namespace,
353+
worker_pod_template_spec=worker_pod_template_spec,
354+
master_pod_template_spec=master_pod_template_spec,
355+
num_workers=nnodes,
356+
num_procs_per_worker=nproc_per_node,
357+
)
322358

323359
print(job_template.to_str())
324360

@@ -328,9 +364,7 @@ def list_phase1_final_model():
328364
training_client.create_job(job_template, namespace=namespace)
329365

330366
expected_conditions = ["Succeeded", "Failed"]
331-
logging.info(
332-
f'Monitoring job until status is any of {expected_conditions}.'
333-
)
367+
logging.info(f"Monitoring job until status is any of {expected_conditions}.")
334368

335369
def wait_for_job_get_logs(
336370
name: str,
@@ -339,7 +373,7 @@ def wait_for_job_get_logs(
339373
expected_conditions: list = ["Succeeded"],
340374
wait_timeout: int = 600,
341375
polling_interval: int = 15,
342-
timeout: int = 1000
376+
timeout: int = 1000,
343377
) -> str:
344378
log_lines = set()
345379
for _ in range(round(wait_timeout / polling_interval)):
@@ -358,12 +392,11 @@ def wait_for_job_get_logs(
358392
for expected_condition in expected_conditions:
359393
if utils.has_condition(conditions, expected_condition):
360394
return conditions
361-
395+
362396
# Get logs dictionary
363397
logs_dict, _ = training_client.get_job_logs(
364-
name=name,
365-
namespace=namespace,
366-
job_kind=kind)
398+
name=name, namespace=namespace, job_kind=kind
399+
)
367400

368401
# Stream new log lines
369402
for key, value in logs_dict.items():
@@ -384,8 +417,8 @@ def wait_for_job_get_logs(
384417
job_kind="PyTorchJob",
385418
expected_conditions=set(expected_conditions),
386419
wait_timeout=job_timeout,
387-
timeout=job_timeout
420+
timeout=job_timeout,
388421
)
389422
if delete_after_done:
390-
logging.info('Deleting job after completion.')
423+
logging.info("Deleting job after completion.")
391424
training_client.delete_job(name, namespace)

0 commit comments

Comments
 (0)