Skip to content

Commit

Permalink
Fix pipeline errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Shreyanand committed Dec 11, 2024
1 parent b296fbd commit 67492e3
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 61 deletions.
98 changes: 48 additions & 50 deletions pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -829,28 +829,27 @@ deploymentSpec:
\ models.V1Volume(\n name=\"model\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\
\ claim_name=model_pvc_name\n ),\n ),\n\
\ models.V1Volume(\n name=\"output\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\
\ claim_name=output_pvc_name\n ),\n )\n\
\ ]\n\n # Set volume mounts\n volume_mounts_common = [\n \
\ claim_name=output_pvc_name\n ),\n ),\n\
\ ]\n\n # Set volume mounts\n volume_mounts_master = [\n \
\ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\
input-data\", read_only=True\n ),\n models.V1VolumeMount(\n\
\ mount_path=\"/input_model\", name=\"model\", read_only=True\n\
\ )\n ]\n volume_mounts_master = volume_mounts_common.append(\n\
\ models.V1VolumeMount(\n mount_path=\"/output\", name=\"\
output\"\n )\n )\n volume_mounts_worker = volume_mounts_common.append(\n\
\ models.V1VolumeMount(\n mount_path=\"/output\", name=\"\
output\", read_only=True\n )\n )\n\n # Set env variables\n\
input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\
/input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\
/output\", name=\"output\")\n ]\n\n volume_mounts_worker = [\n \
\ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\
input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\
/input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\
/output\", name=\"output\", read_only=True)\n ]\n\n # Set env variables\n\
\ env_vars = [\n models.V1EnvVar(name=\"NNODES\", value=f\"{nnodes}\"\
),\n models.V1EnvVar(\n name=\"NPROC_PER_NODE\", value=f\"\
{nproc_per_node}\"\n ),\n models.V1EnvVar(name=\"XDG_CACHE_HOME\"\
, value=\"/tmp\"),\n models.V1EnvVar(name=\"TRITON_CACHE_DIR\", value=\"\
/tmp\"),\n models.V1EnvVar(name=\"HF_HOME\", value=\"/tmp\"),\n \
\ models.V1EnvVar(name=\"TRANSFORMERS_CACHE\", value=\"/tmp\")\n \
\ ]\n\n # Get master and worker container specs\n master_container_spec\
\ = kfto_utils.get_container_spec(\n base_image=base_image,\n \
\ name=\"pytorch\",\n resources=resources_per_worker,\n \
\ volume_mounts=volume_mounts_master,\n )\n\n # In the next release\
\ of kubeflow-training, the command\n # and the args will be a part of\
\ kfto_utils.get_container_spec function\n master_container_spec.command\
),\n models.V1EnvVar(name=\"NPROC_PER_NODE\", value=f\"{nproc_per_node}\"\
),\n models.V1EnvVar(name=\"XDG_CACHE_HOME\", value=\"/tmp\"),\n\
\ models.V1EnvVar(name=\"TRITON_CACHE_DIR\", value=\"/tmp\"),\n \
\ models.V1EnvVar(name=\"HF_HOME\", value=\"/tmp\"),\n models.V1EnvVar(name=\"\
TRANSFORMERS_CACHE\", value=\"/tmp\"),\n ]\n\n # Get master and worker\
\ container specs\n master_container_spec = kfto_utils.get_container_spec(\n\
\ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\
\ volume_mounts=volume_mounts_master,\n )\n\n # In the next\
\ release of kubeflow-training, the command\n # and the args will be\
\ a part of kfto_utils.get_container_spec function\n master_container_spec.command\
\ = command\n master_container_spec.args = master_args\n\n master_container_spec.env\
\ = env_vars\n\n worker_container_spec = kfto_utils.get_container_spec(\n\
\ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\
Expand All @@ -871,9 +870,9 @@ deploymentSpec:
\ master_pod_template_spec=master_pod_template_spec,\n num_workers=nnodes,\n\
\ num_procs_per_worker=nproc_per_node,\n )\n # Save the pytorch\
\ job yaml for record keeping and debugging\n with open(pytorchjob_output_yaml.path,\
\ \"w\", encoding=\"utf-8\") as f:\n f.write(job_template.to_str(),\
\ f)\n\n # Run the pytorch job\n logging.info(f\"Creating PyTorchJob\
\ in namespace: {namespace}\")\n training_client.create_job(job_template,\
\ \"w\", encoding=\"utf-8\") as f:\n f.write(job_template.to_str())\n\
\n # Run the pytorch job\n logging.info(f\"Creating PyTorchJob in\
\ namespace: {namespace}\")\n training_client.create_job(job_template,\
\ namespace=namespace)\n\n expected_conditions = [\"Succeeded\", \"Failed\"\
]\n logging.info(f\"Monitoring job until status is any of {expected_conditions}.\"\
)\n\n def wait_for_job_get_logs(\n name: str,\n namespace:\
Expand All @@ -892,8 +891,8 @@ deploymentSpec:
\ if kfto_utils.has_condition(conditions, expected_condition):\n \
\ return conditions\n\n # Get logs dictionary\n\
\ logs_dict, _ = training_client.get_job_logs(\n \
\ name=name, namespace=namespace, job_kind=kind\n )\n\n \
\ # Stream new log lines\n for key, value in logs_dict.items():\n\
\ name=name, namespace=namespace, job_kind=job_kind\n )\n\n\
\ # Stream new log lines\n for key, value in logs_dict.items():\n\
\ if key not in log_lines:\n logging.info(key)\n\
\ log_lines.add(key)\n\n for line in value.split(\"\
\\n\"):\n if line not in log_lines:\n \
Expand Down Expand Up @@ -995,28 +994,27 @@ deploymentSpec:
\ models.V1Volume(\n name=\"model\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\
\ claim_name=model_pvc_name\n ),\n ),\n\
\ models.V1Volume(\n name=\"output\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\
\ claim_name=output_pvc_name\n ),\n )\n\
\ ]\n\n # Set volume mounts\n volume_mounts_common = [\n \
\ claim_name=output_pvc_name\n ),\n ),\n\
\ ]\n\n # Set volume mounts\n volume_mounts_master = [\n \
\ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\
input-data\", read_only=True\n ),\n models.V1VolumeMount(\n\
\ mount_path=\"/input_model\", name=\"model\", read_only=True\n\
\ )\n ]\n volume_mounts_master = volume_mounts_common.append(\n\
\ models.V1VolumeMount(\n mount_path=\"/output\", name=\"\
output\"\n )\n )\n volume_mounts_worker = volume_mounts_common.append(\n\
\ models.V1VolumeMount(\n mount_path=\"/output\", name=\"\
output\", read_only=True\n )\n )\n\n # Set env variables\n\
input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\
/input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\
/output\", name=\"output\")\n ]\n\n volume_mounts_worker = [\n \
\ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\
input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\
/input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\
/output\", name=\"output\", read_only=True)\n ]\n\n # Set env variables\n\
\ env_vars = [\n models.V1EnvVar(name=\"NNODES\", value=f\"{nnodes}\"\
),\n models.V1EnvVar(\n name=\"NPROC_PER_NODE\", value=f\"\
{nproc_per_node}\"\n ),\n models.V1EnvVar(name=\"XDG_CACHE_HOME\"\
, value=\"/tmp\"),\n models.V1EnvVar(name=\"TRITON_CACHE_DIR\", value=\"\
/tmp\"),\n models.V1EnvVar(name=\"HF_HOME\", value=\"/tmp\"),\n \
\ models.V1EnvVar(name=\"TRANSFORMERS_CACHE\", value=\"/tmp\")\n \
\ ]\n\n # Get master and worker container specs\n master_container_spec\
\ = kfto_utils.get_container_spec(\n base_image=base_image,\n \
\ name=\"pytorch\",\n resources=resources_per_worker,\n \
\ volume_mounts=volume_mounts_master,\n )\n\n # In the next release\
\ of kubeflow-training, the command\n # and the args will be a part of\
\ kfto_utils.get_container_spec function\n master_container_spec.command\
),\n models.V1EnvVar(name=\"NPROC_PER_NODE\", value=f\"{nproc_per_node}\"\
),\n models.V1EnvVar(name=\"XDG_CACHE_HOME\", value=\"/tmp\"),\n\
\ models.V1EnvVar(name=\"TRITON_CACHE_DIR\", value=\"/tmp\"),\n \
\ models.V1EnvVar(name=\"HF_HOME\", value=\"/tmp\"),\n models.V1EnvVar(name=\"\
TRANSFORMERS_CACHE\", value=\"/tmp\"),\n ]\n\n # Get master and worker\
\ container specs\n master_container_spec = kfto_utils.get_container_spec(\n\
\ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\
\ volume_mounts=volume_mounts_master,\n )\n\n # In the next\
\ release of kubeflow-training, the command\n # and the args will be\
\ a part of kfto_utils.get_container_spec function\n master_container_spec.command\
\ = command\n master_container_spec.args = master_args\n\n master_container_spec.env\
\ = env_vars\n\n worker_container_spec = kfto_utils.get_container_spec(\n\
\ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\
Expand All @@ -1037,9 +1035,9 @@ deploymentSpec:
\ master_pod_template_spec=master_pod_template_spec,\n num_workers=nnodes,\n\
\ num_procs_per_worker=nproc_per_node,\n )\n # Save the pytorch\
\ job yaml for record keeping and debugging\n with open(pytorchjob_output_yaml.path,\
\ \"w\", encoding=\"utf-8\") as f:\n f.write(job_template.to_str(),\
\ f)\n\n # Run the pytorch job\n logging.info(f\"Creating PyTorchJob\
\ in namespace: {namespace}\")\n training_client.create_job(job_template,\
\ \"w\", encoding=\"utf-8\") as f:\n f.write(job_template.to_str())\n\
\n # Run the pytorch job\n logging.info(f\"Creating PyTorchJob in\
\ namespace: {namespace}\")\n training_client.create_job(job_template,\
\ namespace=namespace)\n\n expected_conditions = [\"Succeeded\", \"Failed\"\
]\n logging.info(f\"Monitoring job until status is any of {expected_conditions}.\"\
)\n\n def wait_for_job_get_logs(\n name: str,\n namespace:\
Expand All @@ -1058,8 +1056,8 @@ deploymentSpec:
\ if kfto_utils.has_condition(conditions, expected_condition):\n \
\ return conditions\n\n # Get logs dictionary\n\
\ logs_dict, _ = training_client.get_job_logs(\n \
\ name=name, namespace=namespace, job_kind=kind\n )\n\n \
\ # Stream new log lines\n for key, value in logs_dict.items():\n\
\ name=name, namespace=namespace, job_kind=job_kind\n )\n\n\
\ # Stream new log lines\n for key, value in logs_dict.items():\n\
\ if key not in log_lines:\n logging.info(key)\n\
\ log_lines.add(key)\n\n for line in value.split(\"\
\\n\"):\n if line not in log_lines:\n \
Expand Down
25 changes: 14 additions & 11 deletions training/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ def pytorch_job_launcher_op(
job_timeout: int = 86400,
delete_after_done: bool = False,
):
import time
import logging
from kubeflow.training import TrainingClient
from kubeflow.training.utils import utils as kfto_utils
from kubeflow.training import models
import os
import time

from kubeflow.training import TrainingClient, models
from kubeflow.training.utils import utils as kfto_utils

def list_phase1_final_model():
model_dir = "/output/phase_1/model/hf_format"
Expand Down Expand Up @@ -256,18 +256,21 @@ def list_phase1_final_model():
]

# Set volume mounts
volume_mounts_common = [
volume_mounts_master = [
models.V1VolumeMount(
mount_path="/input_data", name="input-data", read_only=True
),
models.V1VolumeMount(mount_path="/input_model", name="model", read_only=True),
]
volume_mounts_master = volume_mounts_common.append(
models.V1VolumeMount(mount_path="/output", name="output")
)
volume_mounts_worker = volume_mounts_common.append(
]

volume_mounts_worker = [
models.V1VolumeMount(
mount_path="/input_data", name="input-data", read_only=True
),
models.V1VolumeMount(mount_path="/input_model", name="model", read_only=True),
models.V1VolumeMount(mount_path="/output", name="output", read_only=True)
)
]

# Set env variables
env_vars = [
Expand Down Expand Up @@ -337,7 +340,7 @@ def list_phase1_final_model():
)
# Save the pytorch job yaml for record keeping and debugging
with open(pytorchjob_output_yaml.path, "w", encoding="utf-8") as f:
f.write(job_template.to_str(), f)
f.write(job_template.to_str())

# Run the pytorch job
logging.info(f"Creating PyTorchJob in namespace: {namespace}")
Expand Down

0 comments on commit 67492e3

Please sign in to comment.