From 2f0d99bb2d8ea7507a1e71e5e56f9a84d9d1b845 Mon Sep 17 00:00:00 2001 From: Shreyanand Date: Wed, 11 Dec 2024 16:45:34 -0500 Subject: [PATCH] Fix pipeline errors --- pipeline.yaml | 98 +++++++++++++++++++++--------------------- training/components.py | 25 ++++++----- 2 files changed, 62 insertions(+), 61 deletions(-) diff --git a/pipeline.yaml b/pipeline.yaml index a8c4aa5..0085669 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -829,28 +829,27 @@ deploymentSpec: \ models.V1Volume(\n name=\"model\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ \ claim_name=model_pvc_name\n ),\n ),\n\ \ models.V1Volume(\n name=\"output\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ - \ claim_name=output_pvc_name\n ),\n )\n\ - \ ]\n\n # Set volume mounts\n volume_mounts_common = [\n \ + \ claim_name=output_pvc_name\n ),\n ),\n\ + \ ]\n\n # Set volume mounts\n volume_mounts_master = [\n \ \ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\ - input-data\", read_only=True\n ),\n models.V1VolumeMount(\n\ - \ mount_path=\"/input_model\", name=\"model\", read_only=True\n\ - \ )\n ]\n volume_mounts_master = volume_mounts_common.append(\n\ - \ models.V1VolumeMount(\n mount_path=\"/output\", name=\"\ - output\"\n )\n )\n volume_mounts_worker = volume_mounts_common.append(\n\ - \ models.V1VolumeMount(\n mount_path=\"/output\", name=\"\ - output\", read_only=True\n )\n )\n\n # Set env variables\n\ + input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\ + /input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\ + /output\", name=\"output\")\n ]\n\n volume_mounts_worker = [\n \ + \ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\ + input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\ + /input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\ + /output\", name=\"output\", read_only=True)\n ]\n\n # Set env variables\n\ \ env_vars = [\n models.V1EnvVar(name=\"NNODES\", value=f\"{nnodes}\"\ - ),\n models.V1EnvVar(\n name=\"NPROC_PER_NODE\", value=f\"\ - {nproc_per_node}\"\n ),\n models.V1EnvVar(name=\"XDG_CACHE_HOME\"\ - , value=\"/tmp\"),\n models.V1EnvVar(name=\"TRITON_CACHE_DIR\", value=\"\ - /tmp\"),\n models.V1EnvVar(name=\"HF_HOME\", value=\"/tmp\"),\n \ - \ models.V1EnvVar(name=\"TRANSFORMERS_CACHE\", value=\"/tmp\")\n \ - \ ]\n\n # Get master and worker container specs\n master_container_spec\ - \ = kfto_utils.get_container_spec(\n base_image=base_image,\n \ - \ name=\"pytorch\",\n resources=resources_per_worker,\n \ - \ volume_mounts=volume_mounts_master,\n )\n\n # In the next release\ - \ of kubeflow-training, the command\n # and the args will be a part of\ - \ kfto_utils.get_container_spec function\n master_container_spec.command\ + ),\n models.V1EnvVar(name=\"NPROC_PER_NODE\", value=f\"{nproc_per_node}\"\ + ),\n models.V1EnvVar(name=\"XDG_CACHE_HOME\", value=\"/tmp\"),\n\ + \ models.V1EnvVar(name=\"TRITON_CACHE_DIR\", value=\"/tmp\"),\n \ + \ models.V1EnvVar(name=\"HF_HOME\", value=\"/tmp\"),\n models.V1EnvVar(name=\"\ + TRANSFORMERS_CACHE\", value=\"/tmp\"),\n ]\n\n # Get master and worker\ + \ container specs\n master_container_spec = kfto_utils.get_container_spec(\n\ + \ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\ + \ volume_mounts=volume_mounts_master,\n )\n\n # In the next\ + \ release of kubeflow-training, the command\n # and the args will be\ + \ a part of kfto_utils.get_container_spec function\n master_container_spec.command\ \ = command\n master_container_spec.args = master_args\n\n master_container_spec.env\ \ = env_vars\n\n worker_container_spec = kfto_utils.get_container_spec(\n\ \ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\ @@ -871,9 +870,9 @@ deploymentSpec: \ master_pod_template_spec=master_pod_template_spec,\n num_workers=nnodes,\n\ \ num_procs_per_worker=nproc_per_node,\n )\n # Save the pytorch\ \ job yaml for record keeping and debugging\n with open(pytorchjob_output_yaml.path,\ - \ \"w\", encoding=\"utf-8\") as f:\n f.write(job_template.to_str(),\ - \ f)\n\n # Run the pytorch job\n logging.info(f\"Creating PyTorchJob\ - \ in namespace: {namespace}\")\n training_client.create_job(job_template,\ + \ \"w\", encoding=\"utf-8\") as f:\n f.write(job_template.to_str())\n\ + \n # Run the pytorch job\n logging.info(f\"Creating PyTorchJob in\ + \ namespace: {namespace}\")\n training_client.create_job(job_template,\ \ namespace=namespace)\n\n expected_conditions = [\"Succeeded\", \"Failed\"\ ]\n logging.info(f\"Monitoring job until status is any of {expected_conditions}.\"\ )\n\n def wait_for_job_get_logs(\n name: str,\n namespace:\ @@ -892,8 +891,8 @@ deploymentSpec: \ if kfto_utils.has_condition(conditions, expected_condition):\n \ \ return conditions\n\n # Get logs dictionary\n\ \ logs_dict, _ = training_client.get_job_logs(\n \ - \ name=name, namespace=namespace, job_kind=kind\n )\n\n \ - \ # Stream new log lines\n for key, value in logs_dict.items():\n\ + \ name=name, namespace=namespace, job_kind=job_kind\n )\n\n\ + \ # Stream new log lines\n for key, value in logs_dict.items():\n\ \ if key not in log_lines:\n logging.info(key)\n\ \ log_lines.add(key)\n\n for line in value.split(\"\ \\n\"):\n if line not in log_lines:\n \ @@ -995,28 +994,27 @@ deploymentSpec: \ models.V1Volume(\n name=\"model\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ \ claim_name=model_pvc_name\n ),\n ),\n\ \ models.V1Volume(\n name=\"output\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ - \ claim_name=output_pvc_name\n ),\n )\n\ - \ ]\n\n # Set volume mounts\n volume_mounts_common = [\n \ + \ claim_name=output_pvc_name\n ),\n ),\n\ + \ ]\n\n # Set volume mounts\n volume_mounts_master = [\n \ \ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\ - input-data\", read_only=True\n ),\n models.V1VolumeMount(\n\ - \ mount_path=\"/input_model\", name=\"model\", read_only=True\n\ - \ )\n ]\n volume_mounts_master = volume_mounts_common.append(\n\ - \ models.V1VolumeMount(\n mount_path=\"/output\", name=\"\ - output\"\n )\n )\n volume_mounts_worker = volume_mounts_common.append(\n\ - \ models.V1VolumeMount(\n mount_path=\"/output\", name=\"\ - output\", read_only=True\n )\n )\n\n # Set env variables\n\ + input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\ + /input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\ + /output\", name=\"output\")\n ]\n\n volume_mounts_worker = [\n \ + \ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\ + input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\ + /input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\ + /output\", name=\"output\", read_only=True)\n ]\n\n # Set env variables\n\ \ env_vars = [\n models.V1EnvVar(name=\"NNODES\", value=f\"{nnodes}\"\ - ),\n models.V1EnvVar(\n name=\"NPROC_PER_NODE\", value=f\"\ - {nproc_per_node}\"\n ),\n models.V1EnvVar(name=\"XDG_CACHE_HOME\"\ - , value=\"/tmp\"),\n models.V1EnvVar(name=\"TRITON_CACHE_DIR\", value=\"\ - /tmp\"),\n models.V1EnvVar(name=\"HF_HOME\", value=\"/tmp\"),\n \ - \ models.V1EnvVar(name=\"TRANSFORMERS_CACHE\", value=\"/tmp\")\n \ - \ ]\n\n # Get master and worker container specs\n master_container_spec\ - \ = kfto_utils.get_container_spec(\n base_image=base_image,\n \ - \ name=\"pytorch\",\n resources=resources_per_worker,\n \ - \ volume_mounts=volume_mounts_master,\n )\n\n # In the next release\ - \ of kubeflow-training, the command\n # and the args will be a part of\ - \ kfto_utils.get_container_spec function\n master_container_spec.command\ + ),\n models.V1EnvVar(name=\"NPROC_PER_NODE\", value=f\"{nproc_per_node}\"\ + ),\n models.V1EnvVar(name=\"XDG_CACHE_HOME\", value=\"/tmp\"),\n\ + \ models.V1EnvVar(name=\"TRITON_CACHE_DIR\", value=\"/tmp\"),\n \ + \ models.V1EnvVar(name=\"HF_HOME\", value=\"/tmp\"),\n models.V1EnvVar(name=\"\ + TRANSFORMERS_CACHE\", value=\"/tmp\"),\n ]\n\n # Get master and worker\ + \ container specs\n master_container_spec = kfto_utils.get_container_spec(\n\ + \ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\ + \ volume_mounts=volume_mounts_master,\n )\n\n # In the next\ + \ release of kubeflow-training, the command\n # and the args will be\ + \ a part of kfto_utils.get_container_spec function\n master_container_spec.command\ \ = command\n master_container_spec.args = master_args\n\n master_container_spec.env\ \ = env_vars\n\n worker_container_spec = kfto_utils.get_container_spec(\n\ \ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\ @@ -1037,9 +1035,9 @@ deploymentSpec: \ master_pod_template_spec=master_pod_template_spec,\n num_workers=nnodes,\n\ \ num_procs_per_worker=nproc_per_node,\n )\n # Save the pytorch\ \ job yaml for record keeping and debugging\n with open(pytorchjob_output_yaml.path,\ - \ \"w\", encoding=\"utf-8\") as f:\n f.write(job_template.to_str(),\ - \ f)\n\n # Run the pytorch job\n logging.info(f\"Creating PyTorchJob\ - \ in namespace: {namespace}\")\n training_client.create_job(job_template,\ + \ \"w\", encoding=\"utf-8\") as f:\n f.write(job_template.to_str())\n\ + \n # Run the pytorch job\n logging.info(f\"Creating PyTorchJob in\ + \ namespace: {namespace}\")\n training_client.create_job(job_template,\ \ namespace=namespace)\n\n expected_conditions = [\"Succeeded\", \"Failed\"\ ]\n logging.info(f\"Monitoring job until status is any of {expected_conditions}.\"\ )\n\n def wait_for_job_get_logs(\n name: str,\n namespace:\ @@ -1058,8 +1056,8 @@ deploymentSpec: \ if kfto_utils.has_condition(conditions, expected_condition):\n \ \ return conditions\n\n # Get logs dictionary\n\ \ logs_dict, _ = training_client.get_job_logs(\n \ - \ name=name, namespace=namespace, job_kind=kind\n )\n\n \ - \ # Stream new log lines\n for key, value in logs_dict.items():\n\ + \ name=name, namespace=namespace, job_kind=job_kind\n )\n\n\ + \ # Stream new log lines\n for key, value in logs_dict.items():\n\ \ if key not in log_lines:\n logging.info(key)\n\ \ log_lines.add(key)\n\n for line in value.split(\"\ \\n\"):\n if line not in log_lines:\n \ diff --git a/training/components.py b/training/components.py index adc804e..2be28c0 100644 --- a/training/components.py +++ b/training/components.py @@ -144,12 +144,12 @@ def pytorch_job_launcher_op( job_timeout: int = 86400, delete_after_done: bool = False, ): - import time import logging - from kubeflow.training import TrainingClient - from kubeflow.training.utils import utils as kfto_utils - from kubeflow.training import models import os + import time + + from kubeflow.training import TrainingClient, models + from kubeflow.training.utils import utils as kfto_utils def list_phase1_final_model(): model_dir = "/output/phase_1/model/hf_format" @@ -256,18 +256,21 @@ def list_phase1_final_model(): ] # Set volume mounts - volume_mounts_common = [ + volume_mounts_master = [ models.V1VolumeMount( mount_path="/input_data", name="input-data", read_only=True ), models.V1VolumeMount(mount_path="/input_model", name="model", read_only=True), - ] - volume_mounts_master = volume_mounts_common.append( models.V1VolumeMount(mount_path="/output", name="output") - ) - volume_mounts_worker = volume_mounts_common.append( + ] + + volume_mounts_worker = [ + models.V1VolumeMount( + mount_path="/input_data", name="input-data", read_only=True + ), + models.V1VolumeMount(mount_path="/input_model", name="model", read_only=True), models.V1VolumeMount(mount_path="/output", name="output", read_only=True) - ) + ] # Set env variables env_vars = [ @@ -337,7 +340,7 @@ def list_phase1_final_model(): ) # Save the pytorch job yaml for record keeping and debugging with open(pytorchjob_output_yaml.path, "w", encoding="utf-8") as f: - f.write(job_template.to_str(), f) + f.write(job_template.to_str()) # Run the pytorch job logging.info(f"Creating PyTorchJob in namespace: {namespace}")