From 7c50e94a1a4efacc212b2d6887eb21ed1418b225 Mon Sep 17 00:00:00 2001 From: Shreyanand Date: Fri, 29 Nov 2024 09:24:15 -0500 Subject: [PATCH] Add kfto component for training Update launcher component with correct params Remove namespace argument Fix formatting Add PR suggestions Add changes for RHEL AI image 1.3 Add changes for RHEL AI image 1.3 Add ruff changes Fix pipeline errors Add RHEL 1.3.1 image Add ruff changes Add uv make pipeline.yaml Change launcher image and get logs func Add make pipeline changes Add latest RHOAI python image Fix rebase errors Change formatting errors and add right image shas Rebase over latest changes Signed-off-by: Shreyanand --- pipeline.py | 9 +- pipeline.yaml | 648 +++++++++++++++++------------------------ training/__init__.py | 4 +- training/components.py | 491 +++++++++++++------------------ utils/consts.py | 2 +- 5 files changed, 484 insertions(+), 670 deletions(-) diff --git a/pipeline.py b/pipeline.py index aeefc6c8..c5435da2 100644 --- a/pipeline.py +++ b/pipeline.py @@ -27,7 +27,7 @@ from training import ( data_processing_op, knowledge_processed_data_to_artifact_op, - pytorchjob_manifest_op, + pytorch_job_launcher_op, skills_processed_data_to_artifact_op, ) from utils import ( @@ -36,6 +36,7 @@ pvc_to_model_op, pvc_to_mt_bench_op, ) +from utils.consts import RHELAI_IMAGE TEACHER_CONFIG_MAP = "teacher-server" TEACHER_SECRET = "teacher-server" @@ -264,12 +265,13 @@ def ilab_pipeline( # Training 1 # Using pvc_create_task.output as PyTorchJob name since dsl.PIPELINE_* global variables do not template/work in KFP v2 # https://github.com/kubeflow/pipelines/issues/10453 - training_phase_1 = pytorchjob_manifest_op( + training_phase_1 = pytorch_job_launcher_op( model_pvc_name=model_pvc_task.output, input_pvc_name=sdg_input_pvc_task.output, name_suffix=sdg_input_pvc_task.output, output_pvc_name=output_pvc_task.output, phase_num=1, + base_image=RHELAI_IMAGE, nproc_per_node=train_nproc_per_node, nnodes=train_nnodes, num_epochs=train_num_epochs_phase_1, @@ -284,12 +286,13 @@ def ilab_pipeline( training_phase_1.set_caching_options(False) #### Train 2 - training_phase_2 = pytorchjob_manifest_op( + training_phase_2 = pytorch_job_launcher_op( model_pvc_name=model_pvc_task.output, input_pvc_name=sdg_input_pvc_task.output, name_suffix=sdg_input_pvc_task.output, output_pvc_name=output_pvc_task.output, phase_num=2, + base_image=RHELAI_IMAGE, nproc_per_node=train_nproc_per_node, nnodes=train_nnodes, num_epochs=train_num_epochs_phase_2, diff --git a/pipeline.yaml b/pipeline.yaml index e00d8619..e3d77635 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -343,16 +343,26 @@ components: artifactType: schemaTitle: system.Artifact schemaVersion: 0.0.1 - comp-pytorchjob-manifest-op: - executorLabel: exec-pytorchjob-manifest-op + comp-pytorch-job-launcher-op: + executorLabel: exec-pytorch-job-launcher-op inputDefinitions: parameters: + base_image: + parameterType: STRING + delete_after_done: + defaultValue: false + isOptional: true + parameterType: BOOLEAN effective_batch_size: defaultValue: 3840.0 isOptional: true parameterType: NUMBER_INTEGER input_pvc_name: parameterType: STRING + job_timeout: + defaultValue: 86400.0 + isOptional: true + parameterType: NUMBER_INTEGER learning_rate: defaultValue: 0.0001 isOptional: true @@ -393,16 +403,32 @@ components: defaultValue: 42.0 isOptional: true parameterType: NUMBER_INTEGER - comp-pytorchjob-manifest-op-2: - executorLabel: exec-pytorchjob-manifest-op-2 + outputDefinitions: + artifacts: + pytorchjob_output_yaml: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + comp-pytorch-job-launcher-op-2: + executorLabel: exec-pytorch-job-launcher-op-2 inputDefinitions: parameters: + base_image: + parameterType: STRING + delete_after_done: + defaultValue: false + isOptional: true + parameterType: BOOLEAN effective_batch_size: defaultValue: 3840.0 isOptional: true parameterType: NUMBER_INTEGER input_pvc_name: parameterType: STRING + job_timeout: + defaultValue: 86400.0 + isOptional: true + parameterType: NUMBER_INTEGER learning_rate: defaultValue: 0.0001 isOptional: true @@ -443,6 +469,12 @@ components: defaultValue: 42.0 isOptional: true parameterType: NUMBER_INTEGER + outputDefinitions: + artifacts: + pytorchjob_output_yaml: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 comp-run-final-eval-op: executorLabel: exec-run-final-eval-op inputDefinitions: @@ -711,13 +743,13 @@ deploymentSpec: - /bin/sh - -c image: registry.redhat.io/ubi9/toolbox@sha256:da31dee8904a535d12689346e65e5b00d11a6179abf1fa69b548dbd755fa2770 - exec-pytorchjob-manifest-op: + exec-pytorch-job-launcher-op: container: args: - --executor_input - '{{$}}' - --function_to_execute - - pytorchjob_manifest_op + - pytorch_job_launcher_op command: - sh - -ec @@ -730,198 +762,124 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef pytorchjob_manifest_op(\n model_pvc_name: str,\n input_pvc_name:\ - \ str,\n output_pvc_name: str,\n name_suffix: str,\n # path_to_model:\ - \ str,\n phase_num: int,\n nproc_per_node: int = 3,\n nnodes: int\ - \ = 2,\n num_epochs: int = 2,\n effective_batch_size: int = 3840,\n\ - \ learning_rate: float = 1e-4,\n num_warmup_steps: int = 800,\n \ - \ save_samples: int = 0,\n max_batch_len: int = 20000,\n seed: int\ - \ = 42,\n):\n import inspect\n import os\n import time\n\n import\ - \ kubernetes\n import urllib3\n import yaml\n\n def list_phase1_final_model():\n\ - \ model_dir = \"/output/phase_1/model/hf_format\"\n models\ - \ = os.listdir(model_dir)\n newest_idx = max(\n (os.path.getmtime(f\"\ - {model_dir}/{model}\"), i)\n for i, model in enumerate(models)\n\ - \ )[-1]\n newest_model = models[newest_idx]\n return\ - \ f\"{model_dir}/{newest_model}\"\n\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\ + \ *\n\ndef pytorch_job_launcher_op(\n pytorchjob_output_yaml: dsl.Output[dsl.Artifact],\n\ + \ model_pvc_name: str,\n input_pvc_name: str,\n output_pvc_name:\ + \ str,\n name_suffix: str,\n phase_num: int,\n base_image: str,\n\ + \ nproc_per_node: int = 3,\n nnodes: int = 2,\n num_epochs: int\ + \ = 2,\n effective_batch_size: int = 3840,\n learning_rate: float\ + \ = 1e-4,\n num_warmup_steps: int = 800,\n save_samples: int = 0,\n\ + \ max_batch_len: int = 20000,\n seed: int = 42,\n job_timeout:\ + \ int = 86400,\n delete_after_done: bool = False,\n):\n import logging\n\ + \ import os\n\n from kubeflow.training import TrainingClient, models\n\ + \ from kubeflow.training.utils import utils as kfto_utils\n\n def\ + \ list_phase1_final_model():\n model_dir = \"/output/phase_1/model/hf_format\"\ + \n model_list = os.listdir(model_dir)\n newest_idx = max(\n\ + \ (os.path.getmtime(f\"{model_dir}/{model}\"), i)\n \ + \ for i, model in enumerate(model_list)\n )[-1]\n newest_model\ + \ = model_list[newest_idx]\n return f\"{model_dir}/{newest_model}\"\ \n\n if phase_num == 1:\n path_to_model = \"/input_model\"\n \ \ path_to_data = \"/input_data/knowledge/data.jsonl\"\n elif phase_num\ \ == 2:\n path_to_model = list_phase1_final_model()\n path_to_data\ \ = \"/input_data/skills/data.jsonl\"\n else:\n raise RuntimeError(f\"\ - Unsupported value of {phase_num=}\")\n\n image = \"registry.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1\"\ - \n\n manifest = inspect.cleandoc(\n f\"\"\"\n apiVersion:\ - \ kubeflow.org/v1\n kind: PyTorchJob\n metadata:\n \ - \ name: {name}\n spec:\n nprocPerNode: \\\"{nproc_per_node}\\\ - \"\n pytorchReplicaSpecs:\n Master:\n replicas:\ - \ 1\n restartPolicy: OnFailure\n template:\n \ - \ metadata:\n annotations:\n \ - \ sidecar.istio.io/inject: 'false'\n spec:\n \ - \ containers:\n - args:\n \ - \ - |\n echo \"Running phase {phase_num}\"\ - \n echo \"Using {path_to_model} model for training\"\ - \n echo \"Using {path_to_data} data for training\"\ - \n mkdir -p /output/phase_{phase_num}/model;\n\ - \ mkdir -p /output/data;\n \ - \ torchrun --nnodes {nnodes} \\\n --nproc_per_node\ - \ {nproc_per_node} \\\n --node_rank \\$(RANK)\ - \ \\\n --rdzv_endpoint \\$(MASTER_ADDR):\\\ - $(MASTER_PORT) \\\n -m instructlab.training.main_ds\ - \ \\\n --model_name_or_path={path_to_model}\ - \ \\\n --data_path={path_to_data} \\\n \ - \ --output_dir=/output/phase_{phase_num}/model\ - \ \\\n --num_epochs={num_epochs} \\\n \ - \ --effective_batch_size={effective_batch_size}\ - \ \\\n --learning_rate={learning_rate} \\\n\ - \ --num_warmup_steps={num_warmup_steps} \\\n\ - \ --save_samples={save_samples} \\\n \ - \ --log_level=INFO \\\n \ - \ --max_batch_len={max_batch_len} \\\n \ - \ --seed={seed} \\\n --cpu_offload_optimizer\ - \ \\\n --cpu_offload_params_fsdp \\\n \ - \ --distributed_training_framework fsdp \\\n \ - \ --checkpoint_at_epoch\n \ - \ command:\n - /bin/bash\n \ - \ - '-c'\n - '--'\n image:\ - \ {image}\n name: pytorch\n volumeMounts:\n\ - \ - mountPath: /input_data\n \ - \ name: input-data\n readOnly: true\n \ - \ - mountPath: /input_model\n \ - \ name: model\n readOnly: true\n \ - \ - mountPath: /output\n name: output\n\ - \ env:\n - name: NNODES\n \ - \ value: \\\"{nnodes}\\\"\n \ - \ - name: NPROC_PER_NODE\n value: \\\"{nproc_per_node}\\\ - \"\n - name: XDG_CACHE_HOME\n \ - \ value: /tmp\n - name: TRITON_CACHE_DIR\n\ - \ value: /tmp\n - name:\ - \ HF_HOME\n value: /tmp\n \ - \ - name: TRANSFORMERS_CACHE\n value: /tmp\n\ - \ resources:\n requests:\n \ - \ \"nvidia.com/gpu\": {nproc_per_node}\n \ - \ limits:\n \"nvidia.com/gpu\"\ - : {nproc_per_node}\n volumes:\n - name:\ - \ input-data\n persistentVolumeClaim:\n \ - \ claimName: {input_pvc_name}\n - name: model\n\ - \ persistentVolumeClaim:\n claimName:\ - \ {model_pvc_name}\n - name: output\n \ - \ persistentVolumeClaim:\n claimName: {output_pvc_name}\n\ - \ Worker:\n replicas: {nnodes-1}\n \ - \ restartPolicy: OnFailure\n template:\n metadata:\n\ - \ annotations:\n sidecar.istio.io/inject:\ - \ 'false'\n spec:\n containers:\n \ - \ - args:\n - |\n \ - \ echo \"Running phase {phase_num}\"\n echo\ - \ \"Using {path_to_model} model for training\"\n \ - \ echo \"Using {path_to_data} data for training\"\n \ - \ mkdir -p /tmp/model;\n torchrun --nnodes\ - \ {nnodes} \\\n --nproc_per_node {nproc_per_node}\ - \ \\\n --node_rank \\$(RANK) \\\n \ - \ --rdzv_endpoint \\$(MASTER_ADDR):\\$(MASTER_PORT) \\\n\ - \ -m instructlab.training.main_ds \\\n \ - \ --model_name_or_path={path_to_model} \\\n \ - \ --data_path={path_to_data} \\\n \ - \ --output_dir=/tmp/model \\\n --num_epochs={num_epochs}\ - \ \\\n --effective_batch_size={effective_batch_size}\ - \ \\\n --learning_rate={learning_rate} \\\n \ - \ --num_warmup_steps={num_warmup_steps} \\\n \ - \ --save_samples={save_samples} \\\n \ - \ --log_level=INFO \\\n --max_batch_len={max_batch_len}\ - \ \\\n --seed={seed} \\\n \ - \ --cpu_offload_optimizer \\\n --cpu_offload_params_fsdp\ - \ \\\n --distributed_training_framework fsdp\ - \ \\\n --checkpoint_at_epoch\n \ - \ command:\n - /bin/bash\n \ - \ - '-c'\n - '--'\n \ - \ image: {image}\n name: pytorch\n \ - \ volumeMounts:\n - mountPath: /input_data\n\ - \ name: input-data\n readOnly:\ - \ true\n - mountPath: /input_model\n \ - \ name: model\n readOnly: true\n \ - \ - mountPath: /output\n \ - \ name: output\n readOnly: true\n \ - \ env:\n - name: NNODES\n \ - \ value: \\\"{nnodes}\\\"\n - name: NPROC_PER_NODE\n\ - \ value: \\\"{nproc_per_node}\\\"\n \ - \ - name: XDG_CACHE_HOME\n value: /tmp\n\ - \ - name: TRITON_CACHE_DIR\n \ - \ value: /tmp\n - name: HF_HOME\n \ - \ value: /tmp\n - name: TRANSFORMERS_CACHE\n\ - \ value: /tmp\n resources:\n\ - \ requests:\n \"nvidia.com/gpu\"\ - : {nproc_per_node}\n limits:\n \ - \ \"nvidia.com/gpu\": {nproc_per_node}\n volumes:\n\ - \ - name: input-data\n persistentVolumeClaim:\n\ - \ claimName: {input_pvc_name}\n \ - \ - name: model\n persistentVolumeClaim:\n \ - \ claimName: {model_pvc_name}\n - name:\ - \ output\n persistentVolumeClaim:\n \ - \ claimName: {output_pvc_name}\n \"\"\"\n )\n\n try:\n\ - \ manifest_yaml = yaml.safe_load(manifest)\n except yaml.YAMLError\ - \ as exc:\n raise RuntimeError(f\"Error parsing manifest: {exc}\"\ - ) from exc\n\n # Discover the namespace in which the pod is running\n\ - \ with open(\n \"/var/run/secrets/kubernetes.io/serviceaccount/namespace\"\ - , \"r\", encoding=\"utf-8\"\n ) as f:\n namespace = f.read().strip()\n\ - \ print(f\"The pod is running in the namespace: {namespace}\")\n\n\ - \ try:\n kubernetes.config.load_kube_config()\n print(\"\ - Loaded kube config\")\n except kubernetes.config.ConfigException:\n \ - \ print(\"Failed to load kube config. Trying in-cluster config\")\n\ - \ kubernetes.config.load_incluster_config()\n\n api = kubernetes.client.CustomObjectsApi()\n\ - \ try:\n api.create_namespaced_custom_object(\n group=\"\ - kubeflow.org\",\n version=\"v1\",\n namespace=namespace,\n\ - \ plural=\"pytorchjobs\",\n body=manifest_yaml,\n\ - \ )\n except kubernetes.client.rest.ApiException as exc:\n \ - \ if exc.status == 409:\n print(\n \"{} '{}/{}'\ - \ already exists.\".format(\n manifest_yaml[\"kind\"\ - ],\n namespace,\n manifest_yaml[\"\ - metadata\"][\"name\"],\n )\n )\n else:\n\ - \ raise\n\n # Get the CR status and wait for it to be completed\n\ - \ w = kubernetes.watch.Watch()\n exit_flag = False\n start_time\ - \ = time.time()\n timeout_seconds = 24 * 60 * 60 # 24 hours\n\n while\ - \ not exit_flag: # Keep the watch active\n if time.time() - start_time\ - \ > timeout_seconds:\n raise RuntimeError(\n \"\ - Timeout (24h) reached waiting for the PytorchJob to complete.\"\n \ - \ )\n\n try:\n print(\"Watching for PytorchJob\"\ - )\n for event in w.stream(\n api.list_namespaced_custom_object,\n\ - \ group=\"kubeflow.org\",\n version=\"v1\"\ - ,\n namespace=namespace,\n plural=\"pytorchjobs\"\ - ,\n timeout_seconds=60, # Timeout after 1 minute\n \ - \ ):\n pytorchjob_event = event[\"object\"]\n \ - \ if (\n pytorchjob_event[\"metadata\"][\"\ - name\"]\n != manifest_yaml[\"metadata\"][\"name\"]\n\ - \ ):\n continue\n pytorchjob_name\ - \ = pytorchjob_event[\"metadata\"][\"name\"]\n\n if (\n \ - \ \"status\" not in pytorchjob_event\n \ - \ or \"conditions\" not in pytorchjob_event[\"status\"]\n \ - \ ):\n continue\n print(\n \ - \ f\"PytorchJob: {pytorchjob_name} - {pytorchjob_event['status'].get('conditions',\ - \ 'No conditions yet')}\"\n )\n for job_condition\ - \ in reversed(pytorchjob_event[\"status\"][\"conditions\"]):\n \ - \ if job_condition[\"type\"] == \"Succeeded\":\n \ - \ print(\n f\"PytorchJob '{pytorchjob_name}'\ - \ completed successfully: {job_condition['reason']}\"\n \ - \ )\n print(f\"Training phase {phase_num}\ - \ completed.\")\n w.stop()\n \ - \ exit_flag = True\n # Break here to avoid going\ - \ into other conditions, we are done\n break\n \ - \ elif job_condition[\"type\"] == \"Failed\":\n \ - \ print(\n f\"PytorchJob '{pytorchjob_name}'\ - \ failed: {job_condition['reason']}\"\n )\n \ - \ w.stop()\n raise RuntimeError(\"\ - Job failed.\")\n except kubernetes.client.exceptions.ApiException\ - \ as e:\n print(f\"API exception occurred: {str(e)}\")\n \ - \ time.sleep(5) # Backoff before retrying\n # Catches the\ - \ following error:\n # urllib3.exceptions.ProtocolError: (\"Connection\ - \ broken: InvalidChunkLength\n except urllib3.exceptions.ProtocolError\ - \ as e:\n print(f\"Connection broken reconnecting the watcher\ - \ {str(e)}\")\n time.sleep(5) # Backoff before retrying\n \ - \ finally:\n w.stop()\n\n" - image: quay.io/modh/odh-generic-data-science-notebook@sha256:0efbb3ad6f8f342360cf1f002d40716a39d4c58f69163e053d5bd19b4fe732d4 - exec-pytorchjob-manifest-op-2: + Unsupported value of {phase_num=}\")\n\n resources_per_worker = {\"nvidia.com/gpu\"\ + : nproc_per_node}\n\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\ + \n command = [\"/bin/sh\", \"-c\", \"--\"]\n\n master_args = [\n \ + \ f\"\"\"\n echo \"Running phase {phase_num}\"\n echo\ + \ \"Using {path_to_model} model for training\"\n echo \"Using {path_to_data}\ + \ data for training\"\n mkdir -p /output/phase_{phase_num}/model;\n\ + \ mkdir -p /output/data;\n torchrun --nnodes {nnodes} \\\n\ + \ --nproc_per_node {nproc_per_node} \\\n --node_rank\ + \ \\$(RANK) \\\n --rdzv_endpoint \\$(MASTER_ADDR):\\$(MASTER_PORT)\ + \ \\\n -m instructlab.training.main_ds \\\n --model_name_or_path={path_to_model}\ + \ \\\n --data_path={path_to_data} \\\n --output_dir=/output/phase_{phase_num}/model\ + \ \\\n --num_epochs={num_epochs} \\\n --effective_batch_size={effective_batch_size}\ + \ \\\n --learning_rate={learning_rate} \\\n --num_warmup_steps={num_warmup_steps}\ + \ \\\n --save_samples={save_samples} \\\n --log_level=INFO\ + \ \\\n --max_batch_len={max_batch_len} \\\n --seed={seed}\ + \ \\\n --cpu_offload_optimizer \\\n --cpu_offload_params_fsdp\ + \ \\\n --distributed_training_framework fsdp \\\n \ + \ --checkpoint_at_epoch\n \"\"\"\n ]\n\n worker_args =\ + \ [\n f\"\"\"\n echo \"Running phase {phase_num}\"\n \ + \ echo \"Using {path_to_model} model for training\"\n echo \"Using\ + \ {path_to_data} data for training\"\n mkdir -p /tmp/model;\n \ + \ torchrun --nnodes {nnodes} \\\n --nproc_per_node {nproc_per_node}\ + \ \\\n --node_rank \\$(RANK) \\\n --rdzv_endpoint \\$(MASTER_ADDR):\\\ + $(MASTER_PORT) \\\n -m instructlab.training.main_ds \\\n \ + \ --model_name_or_path={path_to_model} \\\n --data_path={path_to_data}\ + \ \\\n --output_dir=/tmp/model \\\n --num_epochs={num_epochs}\ + \ \\\n --effective_batch_size={effective_batch_size} \\\n \ + \ --learning_rate={learning_rate} \\\n --num_warmup_steps={num_warmup_steps}\ + \ \\\n --save_samples={save_samples} \\\n --log_level=INFO\ + \ \\\n --max_batch_len={max_batch_len} \\\n --seed={seed}\ + \ \\\n --cpu_offload_optimizer \\\n --cpu_offload_params_fsdp\ + \ \\\n --distributed_training_framework fsdp \\\n --checkpoint_at_epoch\n\ + \ \"\"\"\n ]\n\n # Set volumes\n volumes = [\n \ + \ models.V1Volume(\n name=\"input-data\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ + \ claim_name=input_pvc_name\n ),\n ),\n\ + \ models.V1Volume(\n name=\"model\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ + \ claim_name=model_pvc_name\n ),\n ),\n\ + \ models.V1Volume(\n name=\"output\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ + \ claim_name=output_pvc_name\n ),\n ),\n\ + \ ]\n\n # Set volume mounts\n volume_mounts_master = [\n \ + \ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\ + input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\ + /input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\ + /output\", name=\"output\"),\n ]\n\n volume_mounts_worker = [\n \ + \ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\ + input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\ + /input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\ + /output\", name=\"output\", read_only=True),\n ]\n\n # Set env variables\n\ + \ env_vars = [\n models.V1EnvVar(name=\"NNODES\", value=f\"{nnodes}\"\ + ),\n models.V1EnvVar(name=\"NPROC_PER_NODE\", value=f\"{nproc_per_node}\"\ + ),\n models.V1EnvVar(name=\"XDG_CACHE_HOME\", value=\"/tmp\"),\n\ + \ models.V1EnvVar(name=\"TRITON_CACHE_DIR\", value=\"/tmp\"),\n \ + \ models.V1EnvVar(name=\"HF_HOME\", value=\"/tmp\"),\n models.V1EnvVar(name=\"\ + TRANSFORMERS_CACHE\", value=\"/tmp\"),\n ]\n\n # Get master and worker\ + \ container specs\n master_container_spec = kfto_utils.get_container_spec(\n\ + \ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\ + \ volume_mounts=volume_mounts_master,\n )\n\n # In the next\ + \ release of kubeflow-training, the command\n # and the args will be\ + \ a part of kfto_utils.get_container_spec function\n master_container_spec.command\ + \ = command\n master_container_spec.args = master_args\n\n master_container_spec.env\ + \ = env_vars\n\n worker_container_spec = kfto_utils.get_container_spec(\n\ + \ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\ + \ volume_mounts=volume_mounts_worker,\n )\n worker_container_spec.command\ + \ = command\n worker_container_spec.args = worker_args\n worker_container_spec.env\ + \ = env_vars\n\n # create master pod spec\n master_pod_template_spec\ + \ = kfto_utils.get_pod_template_spec(\n containers=[master_container_spec],\n\ + \ volumes=volumes,\n )\n\n # create worker pod spec\n worker_pod_template_spec\ + \ = kfto_utils.get_pod_template_spec(\n containers=[worker_container_spec],\n\ + \ volumes=volumes,\n )\n\n logging.getLogger(__name__).setLevel(logging.INFO)\n\ + \ logging.info(\"Generating job template.\")\n logging.info(\"Creating\ + \ TrainingClient.\")\n\n # Initialize training client\n # This also\ + \ finds the namespace from /var/run/secrets/kubernetes.io/serviceaccount/namespace\n\ + \ # And it also loads the kube config\n training_client = TrainingClient()\n\ + \ namespace = training_client.namespace\n\n # Create pytorch job spec\n\ + \ job_template = kfto_utils.get_pytorchjob_template(\n name=name,\n\ + \ namespace=namespace,\n worker_pod_template_spec=worker_pod_template_spec,\n\ + \ master_pod_template_spec=master_pod_template_spec,\n num_workers=nnodes,\n\ + \ num_procs_per_worker=nproc_per_node,\n )\n # Save the pytorch\ + \ job yaml for record keeping and debugging\n with open(pytorchjob_output_yaml.path,\ + \ \"w\", encoding=\"utf-8\") as f:\n f.write(job_template.to_str())\n\ + \n # Run the pytorch job\n logging.info(f\"Creating PyTorchJob in\ + \ namespace: {namespace}\")\n training_client.create_job(job_template,\ + \ namespace=namespace)\n\n expected_conditions = [\"Succeeded\", \"Failed\"\ + ]\n logging.info(f\"Monitoring job until status is any of {expected_conditions}.\"\ + )\n\n def get_logs(job):\n _, _ = training_client.get_job_logs(name=job.metadata.name,\ + \ follow=True)\n\n training_client.wait_for_job_conditions(\n \ + \ name=name,\n expected_conditions=set(expected_conditions),\n \ + \ wait_timeout=job_timeout,\n timeout=job_timeout,\n \ + \ callback=get_logs,\n )\n\n if delete_after_done:\n logging.info(\"\ + Deleting job after completion.\")\n training_client.delete_job(name,\ + \ namespace)\n\n" + image: quay.io/modh/odh-generic-data-science-notebook@sha256:72c1d095adbda216a1f1b4b6935e3e2c717cbc58964009464ccd36c0b98312b2 + exec-pytorch-job-launcher-op-2: container: args: - --executor_input - '{{$}}' - --function_to_execute - - pytorchjob_manifest_op + - pytorch_job_launcher_op command: - sh - -ec @@ -934,191 +892,117 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef pytorchjob_manifest_op(\n model_pvc_name: str,\n input_pvc_name:\ - \ str,\n output_pvc_name: str,\n name_suffix: str,\n # path_to_model:\ - \ str,\n phase_num: int,\n nproc_per_node: int = 3,\n nnodes: int\ - \ = 2,\n num_epochs: int = 2,\n effective_batch_size: int = 3840,\n\ - \ learning_rate: float = 1e-4,\n num_warmup_steps: int = 800,\n \ - \ save_samples: int = 0,\n max_batch_len: int = 20000,\n seed: int\ - \ = 42,\n):\n import inspect\n import os\n import time\n\n import\ - \ kubernetes\n import urllib3\n import yaml\n\n def list_phase1_final_model():\n\ - \ model_dir = \"/output/phase_1/model/hf_format\"\n models\ - \ = os.listdir(model_dir)\n newest_idx = max(\n (os.path.getmtime(f\"\ - {model_dir}/{model}\"), i)\n for i, model in enumerate(models)\n\ - \ )[-1]\n newest_model = models[newest_idx]\n return\ - \ f\"{model_dir}/{newest_model}\"\n\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\ + \ *\n\ndef pytorch_job_launcher_op(\n pytorchjob_output_yaml: dsl.Output[dsl.Artifact],\n\ + \ model_pvc_name: str,\n input_pvc_name: str,\n output_pvc_name:\ + \ str,\n name_suffix: str,\n phase_num: int,\n base_image: str,\n\ + \ nproc_per_node: int = 3,\n nnodes: int = 2,\n num_epochs: int\ + \ = 2,\n effective_batch_size: int = 3840,\n learning_rate: float\ + \ = 1e-4,\n num_warmup_steps: int = 800,\n save_samples: int = 0,\n\ + \ max_batch_len: int = 20000,\n seed: int = 42,\n job_timeout:\ + \ int = 86400,\n delete_after_done: bool = False,\n):\n import logging\n\ + \ import os\n\n from kubeflow.training import TrainingClient, models\n\ + \ from kubeflow.training.utils import utils as kfto_utils\n\n def\ + \ list_phase1_final_model():\n model_dir = \"/output/phase_1/model/hf_format\"\ + \n model_list = os.listdir(model_dir)\n newest_idx = max(\n\ + \ (os.path.getmtime(f\"{model_dir}/{model}\"), i)\n \ + \ for i, model in enumerate(model_list)\n )[-1]\n newest_model\ + \ = model_list[newest_idx]\n return f\"{model_dir}/{newest_model}\"\ \n\n if phase_num == 1:\n path_to_model = \"/input_model\"\n \ \ path_to_data = \"/input_data/knowledge/data.jsonl\"\n elif phase_num\ \ == 2:\n path_to_model = list_phase1_final_model()\n path_to_data\ \ = \"/input_data/skills/data.jsonl\"\n else:\n raise RuntimeError(f\"\ - Unsupported value of {phase_num=}\")\n\n image = \"registry.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1\"\ - \n\n manifest = inspect.cleandoc(\n f\"\"\"\n apiVersion:\ - \ kubeflow.org/v1\n kind: PyTorchJob\n metadata:\n \ - \ name: {name}\n spec:\n nprocPerNode: \\\"{nproc_per_node}\\\ - \"\n pytorchReplicaSpecs:\n Master:\n replicas:\ - \ 1\n restartPolicy: OnFailure\n template:\n \ - \ metadata:\n annotations:\n \ - \ sidecar.istio.io/inject: 'false'\n spec:\n \ - \ containers:\n - args:\n \ - \ - |\n echo \"Running phase {phase_num}\"\ - \n echo \"Using {path_to_model} model for training\"\ - \n echo \"Using {path_to_data} data for training\"\ - \n mkdir -p /output/phase_{phase_num}/model;\n\ - \ mkdir -p /output/data;\n \ - \ torchrun --nnodes {nnodes} \\\n --nproc_per_node\ - \ {nproc_per_node} \\\n --node_rank \\$(RANK)\ - \ \\\n --rdzv_endpoint \\$(MASTER_ADDR):\\\ - $(MASTER_PORT) \\\n -m instructlab.training.main_ds\ - \ \\\n --model_name_or_path={path_to_model}\ - \ \\\n --data_path={path_to_data} \\\n \ - \ --output_dir=/output/phase_{phase_num}/model\ - \ \\\n --num_epochs={num_epochs} \\\n \ - \ --effective_batch_size={effective_batch_size}\ - \ \\\n --learning_rate={learning_rate} \\\n\ - \ --num_warmup_steps={num_warmup_steps} \\\n\ - \ --save_samples={save_samples} \\\n \ - \ --log_level=INFO \\\n \ - \ --max_batch_len={max_batch_len} \\\n \ - \ --seed={seed} \\\n --cpu_offload_optimizer\ - \ \\\n --cpu_offload_params_fsdp \\\n \ - \ --distributed_training_framework fsdp \\\n \ - \ --checkpoint_at_epoch\n \ - \ command:\n - /bin/bash\n \ - \ - '-c'\n - '--'\n image:\ - \ {image}\n name: pytorch\n volumeMounts:\n\ - \ - mountPath: /input_data\n \ - \ name: input-data\n readOnly: true\n \ - \ - mountPath: /input_model\n \ - \ name: model\n readOnly: true\n \ - \ - mountPath: /output\n name: output\n\ - \ env:\n - name: NNODES\n \ - \ value: \\\"{nnodes}\\\"\n \ - \ - name: NPROC_PER_NODE\n value: \\\"{nproc_per_node}\\\ - \"\n - name: XDG_CACHE_HOME\n \ - \ value: /tmp\n - name: TRITON_CACHE_DIR\n\ - \ value: /tmp\n - name:\ - \ HF_HOME\n value: /tmp\n \ - \ - name: TRANSFORMERS_CACHE\n value: /tmp\n\ - \ resources:\n requests:\n \ - \ \"nvidia.com/gpu\": {nproc_per_node}\n \ - \ limits:\n \"nvidia.com/gpu\"\ - : {nproc_per_node}\n volumes:\n - name:\ - \ input-data\n persistentVolumeClaim:\n \ - \ claimName: {input_pvc_name}\n - name: model\n\ - \ persistentVolumeClaim:\n claimName:\ - \ {model_pvc_name}\n - name: output\n \ - \ persistentVolumeClaim:\n claimName: {output_pvc_name}\n\ - \ Worker:\n replicas: {nnodes-1}\n \ - \ restartPolicy: OnFailure\n template:\n metadata:\n\ - \ annotations:\n sidecar.istio.io/inject:\ - \ 'false'\n spec:\n containers:\n \ - \ - args:\n - |\n \ - \ echo \"Running phase {phase_num}\"\n echo\ - \ \"Using {path_to_model} model for training\"\n \ - \ echo \"Using {path_to_data} data for training\"\n \ - \ mkdir -p /tmp/model;\n torchrun --nnodes\ - \ {nnodes} \\\n --nproc_per_node {nproc_per_node}\ - \ \\\n --node_rank \\$(RANK) \\\n \ - \ --rdzv_endpoint \\$(MASTER_ADDR):\\$(MASTER_PORT) \\\n\ - \ -m instructlab.training.main_ds \\\n \ - \ --model_name_or_path={path_to_model} \\\n \ - \ --data_path={path_to_data} \\\n \ - \ --output_dir=/tmp/model \\\n --num_epochs={num_epochs}\ - \ \\\n --effective_batch_size={effective_batch_size}\ - \ \\\n --learning_rate={learning_rate} \\\n \ - \ --num_warmup_steps={num_warmup_steps} \\\n \ - \ --save_samples={save_samples} \\\n \ - \ --log_level=INFO \\\n --max_batch_len={max_batch_len}\ - \ \\\n --seed={seed} \\\n \ - \ --cpu_offload_optimizer \\\n --cpu_offload_params_fsdp\ - \ \\\n --distributed_training_framework fsdp\ - \ \\\n --checkpoint_at_epoch\n \ - \ command:\n - /bin/bash\n \ - \ - '-c'\n - '--'\n \ - \ image: {image}\n name: pytorch\n \ - \ volumeMounts:\n - mountPath: /input_data\n\ - \ name: input-data\n readOnly:\ - \ true\n - mountPath: /input_model\n \ - \ name: model\n readOnly: true\n \ - \ - mountPath: /output\n \ - \ name: output\n readOnly: true\n \ - \ env:\n - name: NNODES\n \ - \ value: \\\"{nnodes}\\\"\n - name: NPROC_PER_NODE\n\ - \ value: \\\"{nproc_per_node}\\\"\n \ - \ - name: XDG_CACHE_HOME\n value: /tmp\n\ - \ - name: TRITON_CACHE_DIR\n \ - \ value: /tmp\n - name: HF_HOME\n \ - \ value: /tmp\n - name: TRANSFORMERS_CACHE\n\ - \ value: /tmp\n resources:\n\ - \ requests:\n \"nvidia.com/gpu\"\ - : {nproc_per_node}\n limits:\n \ - \ \"nvidia.com/gpu\": {nproc_per_node}\n volumes:\n\ - \ - name: input-data\n persistentVolumeClaim:\n\ - \ claimName: {input_pvc_name}\n \ - \ - name: model\n persistentVolumeClaim:\n \ - \ claimName: {model_pvc_name}\n - name:\ - \ output\n persistentVolumeClaim:\n \ - \ claimName: {output_pvc_name}\n \"\"\"\n )\n\n try:\n\ - \ manifest_yaml = yaml.safe_load(manifest)\n except yaml.YAMLError\ - \ as exc:\n raise RuntimeError(f\"Error parsing manifest: {exc}\"\ - ) from exc\n\n # Discover the namespace in which the pod is running\n\ - \ with open(\n \"/var/run/secrets/kubernetes.io/serviceaccount/namespace\"\ - , \"r\", encoding=\"utf-8\"\n ) as f:\n namespace = f.read().strip()\n\ - \ print(f\"The pod is running in the namespace: {namespace}\")\n\n\ - \ try:\n kubernetes.config.load_kube_config()\n print(\"\ - Loaded kube config\")\n except kubernetes.config.ConfigException:\n \ - \ print(\"Failed to load kube config. Trying in-cluster config\")\n\ - \ kubernetes.config.load_incluster_config()\n\n api = kubernetes.client.CustomObjectsApi()\n\ - \ try:\n api.create_namespaced_custom_object(\n group=\"\ - kubeflow.org\",\n version=\"v1\",\n namespace=namespace,\n\ - \ plural=\"pytorchjobs\",\n body=manifest_yaml,\n\ - \ )\n except kubernetes.client.rest.ApiException as exc:\n \ - \ if exc.status == 409:\n print(\n \"{} '{}/{}'\ - \ already exists.\".format(\n manifest_yaml[\"kind\"\ - ],\n namespace,\n manifest_yaml[\"\ - metadata\"][\"name\"],\n )\n )\n else:\n\ - \ raise\n\n # Get the CR status and wait for it to be completed\n\ - \ w = kubernetes.watch.Watch()\n exit_flag = False\n start_time\ - \ = time.time()\n timeout_seconds = 24 * 60 * 60 # 24 hours\n\n while\ - \ not exit_flag: # Keep the watch active\n if time.time() - start_time\ - \ > timeout_seconds:\n raise RuntimeError(\n \"\ - Timeout (24h) reached waiting for the PytorchJob to complete.\"\n \ - \ )\n\n try:\n print(\"Watching for PytorchJob\"\ - )\n for event in w.stream(\n api.list_namespaced_custom_object,\n\ - \ group=\"kubeflow.org\",\n version=\"v1\"\ - ,\n namespace=namespace,\n plural=\"pytorchjobs\"\ - ,\n timeout_seconds=60, # Timeout after 1 minute\n \ - \ ):\n pytorchjob_event = event[\"object\"]\n \ - \ if (\n pytorchjob_event[\"metadata\"][\"\ - name\"]\n != manifest_yaml[\"metadata\"][\"name\"]\n\ - \ ):\n continue\n pytorchjob_name\ - \ = pytorchjob_event[\"metadata\"][\"name\"]\n\n if (\n \ - \ \"status\" not in pytorchjob_event\n \ - \ or \"conditions\" not in pytorchjob_event[\"status\"]\n \ - \ ):\n continue\n print(\n \ - \ f\"PytorchJob: {pytorchjob_name} - {pytorchjob_event['status'].get('conditions',\ - \ 'No conditions yet')}\"\n )\n for job_condition\ - \ in reversed(pytorchjob_event[\"status\"][\"conditions\"]):\n \ - \ if job_condition[\"type\"] == \"Succeeded\":\n \ - \ print(\n f\"PytorchJob '{pytorchjob_name}'\ - \ completed successfully: {job_condition['reason']}\"\n \ - \ )\n print(f\"Training phase {phase_num}\ - \ completed.\")\n w.stop()\n \ - \ exit_flag = True\n # Break here to avoid going\ - \ into other conditions, we are done\n break\n \ - \ elif job_condition[\"type\"] == \"Failed\":\n \ - \ print(\n f\"PytorchJob '{pytorchjob_name}'\ - \ failed: {job_condition['reason']}\"\n )\n \ - \ w.stop()\n raise RuntimeError(\"\ - Job failed.\")\n except kubernetes.client.exceptions.ApiException\ - \ as e:\n print(f\"API exception occurred: {str(e)}\")\n \ - \ time.sleep(5) # Backoff before retrying\n # Catches the\ - \ following error:\n # urllib3.exceptions.ProtocolError: (\"Connection\ - \ broken: InvalidChunkLength\n except urllib3.exceptions.ProtocolError\ - \ as e:\n print(f\"Connection broken reconnecting the watcher\ - \ {str(e)}\")\n time.sleep(5) # Backoff before retrying\n \ - \ finally:\n w.stop()\n\n" - image: quay.io/modh/odh-generic-data-science-notebook@sha256:0efbb3ad6f8f342360cf1f002d40716a39d4c58f69163e053d5bd19b4fe732d4 + Unsupported value of {phase_num=}\")\n\n resources_per_worker = {\"nvidia.com/gpu\"\ + : nproc_per_node}\n\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\ + \n command = [\"/bin/sh\", \"-c\", \"--\"]\n\n master_args = [\n \ + \ f\"\"\"\n echo \"Running phase {phase_num}\"\n echo\ + \ \"Using {path_to_model} model for training\"\n echo \"Using {path_to_data}\ + \ data for training\"\n mkdir -p /output/phase_{phase_num}/model;\n\ + \ mkdir -p /output/data;\n torchrun --nnodes {nnodes} \\\n\ + \ --nproc_per_node {nproc_per_node} \\\n --node_rank\ + \ \\$(RANK) \\\n --rdzv_endpoint \\$(MASTER_ADDR):\\$(MASTER_PORT)\ + \ \\\n -m instructlab.training.main_ds \\\n --model_name_or_path={path_to_model}\ + \ \\\n --data_path={path_to_data} \\\n --output_dir=/output/phase_{phase_num}/model\ + \ \\\n --num_epochs={num_epochs} \\\n --effective_batch_size={effective_batch_size}\ + \ \\\n --learning_rate={learning_rate} \\\n --num_warmup_steps={num_warmup_steps}\ + \ \\\n --save_samples={save_samples} \\\n --log_level=INFO\ + \ \\\n --max_batch_len={max_batch_len} \\\n --seed={seed}\ + \ \\\n --cpu_offload_optimizer \\\n --cpu_offload_params_fsdp\ + \ \\\n --distributed_training_framework fsdp \\\n \ + \ --checkpoint_at_epoch\n \"\"\"\n ]\n\n worker_args =\ + \ [\n f\"\"\"\n echo \"Running phase {phase_num}\"\n \ + \ echo \"Using {path_to_model} model for training\"\n echo \"Using\ + \ {path_to_data} data for training\"\n mkdir -p /tmp/model;\n \ + \ torchrun --nnodes {nnodes} \\\n --nproc_per_node {nproc_per_node}\ + \ \\\n --node_rank \\$(RANK) \\\n --rdzv_endpoint \\$(MASTER_ADDR):\\\ + $(MASTER_PORT) \\\n -m instructlab.training.main_ds \\\n \ + \ --model_name_or_path={path_to_model} \\\n --data_path={path_to_data}\ + \ \\\n --output_dir=/tmp/model \\\n --num_epochs={num_epochs}\ + \ \\\n --effective_batch_size={effective_batch_size} \\\n \ + \ --learning_rate={learning_rate} \\\n --num_warmup_steps={num_warmup_steps}\ + \ \\\n --save_samples={save_samples} \\\n --log_level=INFO\ + \ \\\n --max_batch_len={max_batch_len} \\\n --seed={seed}\ + \ \\\n --cpu_offload_optimizer \\\n --cpu_offload_params_fsdp\ + \ \\\n --distributed_training_framework fsdp \\\n --checkpoint_at_epoch\n\ + \ \"\"\"\n ]\n\n # Set volumes\n volumes = [\n \ + \ models.V1Volume(\n name=\"input-data\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ + \ claim_name=input_pvc_name\n ),\n ),\n\ + \ models.V1Volume(\n name=\"model\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ + \ claim_name=model_pvc_name\n ),\n ),\n\ + \ models.V1Volume(\n name=\"output\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ + \ claim_name=output_pvc_name\n ),\n ),\n\ + \ ]\n\n # Set volume mounts\n volume_mounts_master = [\n \ + \ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\ + input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\ + /input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\ + /output\", name=\"output\"),\n ]\n\n volume_mounts_worker = [\n \ + \ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\ + input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\ + /input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\ + /output\", name=\"output\", read_only=True),\n ]\n\n # Set env variables\n\ + \ env_vars = [\n models.V1EnvVar(name=\"NNODES\", value=f\"{nnodes}\"\ + ),\n models.V1EnvVar(name=\"NPROC_PER_NODE\", value=f\"{nproc_per_node}\"\ + ),\n models.V1EnvVar(name=\"XDG_CACHE_HOME\", value=\"/tmp\"),\n\ + \ models.V1EnvVar(name=\"TRITON_CACHE_DIR\", value=\"/tmp\"),\n \ + \ models.V1EnvVar(name=\"HF_HOME\", value=\"/tmp\"),\n models.V1EnvVar(name=\"\ + TRANSFORMERS_CACHE\", value=\"/tmp\"),\n ]\n\n # Get master and worker\ + \ container specs\n master_container_spec = kfto_utils.get_container_spec(\n\ + \ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\ + \ volume_mounts=volume_mounts_master,\n )\n\n # In the next\ + \ release of kubeflow-training, the command\n # and the args will be\ + \ a part of kfto_utils.get_container_spec function\n master_container_spec.command\ + \ = command\n master_container_spec.args = master_args\n\n master_container_spec.env\ + \ = env_vars\n\n worker_container_spec = kfto_utils.get_container_spec(\n\ + \ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\ + \ volume_mounts=volume_mounts_worker,\n )\n worker_container_spec.command\ + \ = command\n worker_container_spec.args = worker_args\n worker_container_spec.env\ + \ = env_vars\n\n # create master pod spec\n master_pod_template_spec\ + \ = kfto_utils.get_pod_template_spec(\n containers=[master_container_spec],\n\ + \ volumes=volumes,\n )\n\n # create worker pod spec\n worker_pod_template_spec\ + \ = kfto_utils.get_pod_template_spec(\n containers=[worker_container_spec],\n\ + \ volumes=volumes,\n )\n\n logging.getLogger(__name__).setLevel(logging.INFO)\n\ + \ logging.info(\"Generating job template.\")\n logging.info(\"Creating\ + \ TrainingClient.\")\n\n # Initialize training client\n # This also\ + \ finds the namespace from /var/run/secrets/kubernetes.io/serviceaccount/namespace\n\ + \ # And it also loads the kube config\n training_client = TrainingClient()\n\ + \ namespace = training_client.namespace\n\n # Create pytorch job spec\n\ + \ job_template = kfto_utils.get_pytorchjob_template(\n name=name,\n\ + \ namespace=namespace,\n worker_pod_template_spec=worker_pod_template_spec,\n\ + \ master_pod_template_spec=master_pod_template_spec,\n num_workers=nnodes,\n\ + \ num_procs_per_worker=nproc_per_node,\n )\n # Save the pytorch\ + \ job yaml for record keeping and debugging\n with open(pytorchjob_output_yaml.path,\ + \ \"w\", encoding=\"utf-8\") as f:\n f.write(job_template.to_str())\n\ + \n # Run the pytorch job\n logging.info(f\"Creating PyTorchJob in\ + \ namespace: {namespace}\")\n training_client.create_job(job_template,\ + \ namespace=namespace)\n\n expected_conditions = [\"Succeeded\", \"Failed\"\ + ]\n logging.info(f\"Monitoring job until status is any of {expected_conditions}.\"\ + )\n\n def get_logs(job):\n _, _ = training_client.get_job_logs(name=job.metadata.name,\ + \ follow=True)\n\n training_client.wait_for_job_conditions(\n \ + \ name=name,\n expected_conditions=set(expected_conditions),\n \ + \ wait_timeout=job_timeout,\n timeout=job_timeout,\n \ + \ callback=get_logs,\n )\n\n if delete_after_done:\n logging.info(\"\ + Deleting job after completion.\")\n training_client.delete_job(name,\ + \ namespace)\n\n" + image: quay.io/modh/odh-generic-data-science-notebook@sha256:72c1d095adbda216a1f1b4b6935e3e2c717cbc58964009464ccd36c0b98312b2 exec-run-final-eval-op: container: args: @@ -1853,10 +1737,10 @@ root: constant: /output/mt_bench_data.json taskInfo: name: pvc-to-mt-bench-op - pytorchjob-manifest-op: + pytorch-job-launcher-op: cachingOptions: {} componentRef: - name: comp-pytorchjob-manifest-op + name: comp-pytorch-job-launcher-op dependentTasks: - createpvc - createpvc-2 @@ -1865,6 +1749,9 @@ root: - model-to-pvc-op inputs: parameters: + base_image: + runtimeValue: + constant: registry.redhat.io/rhelai1/instructlab-nvidia-rhel9@sha256:05cfba1fb13ed54b1de4d021da2a31dd78ba7d8cc48e10c7fe372815899a18ae effective_batch_size: componentInputParameter: train_effective_batch_size_phase_1 input_pvc_name: @@ -1903,18 +1790,21 @@ root: seed: componentInputParameter: train_seed taskInfo: - name: pytorchjob-manifest-op - pytorchjob-manifest-op-2: + name: pytorch-job-launcher-op + pytorch-job-launcher-op-2: cachingOptions: {} componentRef: - name: comp-pytorchjob-manifest-op-2 + name: comp-pytorch-job-launcher-op-2 dependentTasks: - createpvc - createpvc-2 - createpvc-3 - - pytorchjob-manifest-op + - pytorch-job-launcher-op inputs: parameters: + base_image: + runtimeValue: + constant: registry.redhat.io/rhelai1/instructlab-nvidia-rhel9@sha256:05cfba1fb13ed54b1de4d021da2a31dd78ba7d8cc48e10c7fe372815899a18ae effective_batch_size: componentInputParameter: train_effective_batch_size_phase_2 input_pvc_name: @@ -1953,7 +1843,7 @@ root: seed: componentInputParameter: train_seed taskInfo: - name: pytorchjob-manifest-op-2 + name: pytorch-job-launcher-op-2 run-final-eval-op: cachingOptions: {} componentRef: @@ -1991,7 +1881,7 @@ root: name: comp-run-mt-bench-op dependentTasks: - createpvc-3 - - pytorchjob-manifest-op-2 + - pytorch-job-launcher-op-2 inputs: parameters: max_workers: @@ -2273,7 +2163,7 @@ platforms: taskOutputParameter: outputParameterKey: name producerTask: createpvc-3 - exec-pytorchjob-manifest-op-2: + exec-pytorch-job-launcher-op-2: pvcMount: - mountPath: /output taskOutputParameter: diff --git a/training/__init__.py b/training/__init__.py index 22770e46..470c3a76 100644 --- a/training/__init__.py +++ b/training/__init__.py @@ -1,13 +1,13 @@ from .components import ( data_processing_op, knowledge_processed_data_to_artifact_op, - pytorchjob_manifest_op, + pytorch_job_launcher_op, skills_processed_data_to_artifact_op, ) __all__ = [ "data_processing_op", - "pytorchjob_manifest_op", + "pytorch_job_launcher_op", "skills_processed_data_to_artifact_op", "knowledge_processed_data_to_artifact_op", ] diff --git a/training/components.py b/training/components.py index 917f2a24..852b7412 100644 --- a/training/components.py +++ b/training/components.py @@ -1,5 +1,5 @@ # type: ignore -# pylint: disable=import-outside-toplevel,import-error +# pylint: disable=import-outside-toplevel,missing-function-docstring from typing import Optional @@ -8,7 +8,10 @@ from utils.consts import PYTHON_IMAGE, RHELAI_IMAGE, TOOLBOX_IMAGE -@dsl.component(base_image=RHELAI_IMAGE, install_kfp_package=False) +@dsl.component( + base_image=RHELAI_IMAGE, + install_kfp_package=False, +) def data_processing_op( model_path: str = "/model", sdg_path: str = "/data/sdg", @@ -120,14 +123,16 @@ def knowledge_processed_data_to_artifact_op( ) +# Change base image to the RHOAI python image with kubeflow_training once available @dsl.component(base_image=PYTHON_IMAGE, install_kfp_package=False) -def pytorchjob_manifest_op( +def pytorch_job_launcher_op( + pytorchjob_output_yaml: dsl.Output[dsl.Artifact], model_pvc_name: str, input_pvc_name: str, output_pvc_name: str, name_suffix: str, - # path_to_model: str, phase_num: int, + base_image: str, nproc_per_node: int = 3, nnodes: int = 2, num_epochs: int = 2, @@ -137,27 +142,25 @@ def pytorchjob_manifest_op( save_samples: int = 0, max_batch_len: int = 20000, seed: int = 42, + job_timeout: int = 86400, + delete_after_done: bool = False, ): - import inspect + import logging import os - import time - import kubernetes - import urllib3 - import yaml + from kubeflow.training import TrainingClient, models + from kubeflow.training.utils import utils as kfto_utils def list_phase1_final_model(): model_dir = "/output/phase_1/model/hf_format" - models = os.listdir(model_dir) + model_list = os.listdir(model_dir) newest_idx = max( (os.path.getmtime(f"{model_dir}/{model}"), i) - for i, model in enumerate(models) + for i, model in enumerate(model_list) )[-1] - newest_model = models[newest_idx] + newest_model = model_list[newest_idx] return f"{model_dir}/{newest_model}" - name = f"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}" - if phase_num == 1: path_to_model = "/input_model" path_to_data = "/input_data/knowledge/data.jsonl" @@ -167,279 +170,197 @@ def list_phase1_final_model(): else: raise RuntimeError(f"Unsupported value of {phase_num=}") - image = "registry.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1" + resources_per_worker = {"nvidia.com/gpu": nproc_per_node} + + name = f"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}" + command = ["/bin/sh", "-c", "--"] + + master_args = [ + f""" + echo "Running phase {phase_num}" + echo "Using {path_to_model} model for training" + echo "Using {path_to_data} data for training" + mkdir -p /output/phase_{phase_num}/model; + mkdir -p /output/data; + torchrun --nnodes {nnodes} \ + --nproc_per_node {nproc_per_node} \ + --node_rank \$(RANK) \ + --rdzv_endpoint \$(MASTER_ADDR):\$(MASTER_PORT) \ + -m instructlab.training.main_ds \ + --model_name_or_path={path_to_model} \ + --data_path={path_to_data} \ + --output_dir=/output/phase_{phase_num}/model \ + --num_epochs={num_epochs} \ + --effective_batch_size={effective_batch_size} \ + --learning_rate={learning_rate} \ + --num_warmup_steps={num_warmup_steps} \ + --save_samples={save_samples} \ + --log_level=INFO \ + --max_batch_len={max_batch_len} \ + --seed={seed} \ + --cpu_offload_optimizer \ + --cpu_offload_params_fsdp \ + --distributed_training_framework fsdp \ + --checkpoint_at_epoch + """ + ] - manifest = inspect.cleandoc( + worker_args = [ f""" - apiVersion: kubeflow.org/v1 - kind: PyTorchJob - metadata: - name: {name} - spec: - nprocPerNode: \"{nproc_per_node}\" - pytorchReplicaSpecs: - Master: - replicas: 1 - restartPolicy: OnFailure - template: - metadata: - annotations: - sidecar.istio.io/inject: 'false' - spec: - containers: - - args: - - | - echo "Running phase {phase_num}" - echo "Using {path_to_model} model for training" - echo "Using {path_to_data} data for training" - mkdir -p /output/phase_{phase_num}/model; - mkdir -p /output/data; - torchrun --nnodes {nnodes} \ - --nproc_per_node {nproc_per_node} \ - --node_rank \$(RANK) \ - --rdzv_endpoint \$(MASTER_ADDR):\$(MASTER_PORT) \ - -m instructlab.training.main_ds \ - --model_name_or_path={path_to_model} \ - --data_path={path_to_data} \ - --output_dir=/output/phase_{phase_num}/model \ - --num_epochs={num_epochs} \ - --effective_batch_size={effective_batch_size} \ - --learning_rate={learning_rate} \ - --num_warmup_steps={num_warmup_steps} \ - --save_samples={save_samples} \ - --log_level=INFO \ - --max_batch_len={max_batch_len} \ - --seed={seed} \ - --cpu_offload_optimizer \ - --cpu_offload_params_fsdp \ - --distributed_training_framework fsdp \ - --checkpoint_at_epoch - command: - - /bin/bash - - '-c' - - '--' - image: {image} - name: pytorch - volumeMounts: - - mountPath: /input_data - name: input-data - readOnly: true - - mountPath: /input_model - name: model - readOnly: true - - mountPath: /output - name: output - env: - - name: NNODES - value: \"{nnodes}\" - - name: NPROC_PER_NODE - value: \"{nproc_per_node}\" - - name: XDG_CACHE_HOME - value: /tmp - - name: TRITON_CACHE_DIR - value: /tmp - - name: HF_HOME - value: /tmp - - name: TRANSFORMERS_CACHE - value: /tmp - resources: - requests: - "nvidia.com/gpu": {nproc_per_node} - limits: - "nvidia.com/gpu": {nproc_per_node} - volumes: - - name: input-data - persistentVolumeClaim: - claimName: {input_pvc_name} - - name: model - persistentVolumeClaim: - claimName: {model_pvc_name} - - name: output - persistentVolumeClaim: - claimName: {output_pvc_name} - Worker: - replicas: {nnodes-1} - restartPolicy: OnFailure - template: - metadata: - annotations: - sidecar.istio.io/inject: 'false' - spec: - containers: - - args: - - | - echo "Running phase {phase_num}" - echo "Using {path_to_model} model for training" - echo "Using {path_to_data} data for training" - mkdir -p /tmp/model; - torchrun --nnodes {nnodes} \ - --nproc_per_node {nproc_per_node} \ - --node_rank \$(RANK) \ - --rdzv_endpoint \$(MASTER_ADDR):\$(MASTER_PORT) \ - -m instructlab.training.main_ds \ - --model_name_or_path={path_to_model} \ - --data_path={path_to_data} \ - --output_dir=/tmp/model \ - --num_epochs={num_epochs} \ - --effective_batch_size={effective_batch_size} \ - --learning_rate={learning_rate} \ - --num_warmup_steps={num_warmup_steps} \ - --save_samples={save_samples} \ - --log_level=INFO \ - --max_batch_len={max_batch_len} \ - --seed={seed} \ - --cpu_offload_optimizer \ - --cpu_offload_params_fsdp \ - --distributed_training_framework fsdp \ - --checkpoint_at_epoch - command: - - /bin/bash - - '-c' - - '--' - image: {image} - name: pytorch - volumeMounts: - - mountPath: /input_data - name: input-data - readOnly: true - - mountPath: /input_model - name: model - readOnly: true - - mountPath: /output - name: output - readOnly: true - env: - - name: NNODES - value: \"{nnodes}\" - - name: NPROC_PER_NODE - value: \"{nproc_per_node}\" - - name: XDG_CACHE_HOME - value: /tmp - - name: TRITON_CACHE_DIR - value: /tmp - - name: HF_HOME - value: /tmp - - name: TRANSFORMERS_CACHE - value: /tmp - resources: - requests: - "nvidia.com/gpu": {nproc_per_node} - limits: - "nvidia.com/gpu": {nproc_per_node} - volumes: - - name: input-data - persistentVolumeClaim: - claimName: {input_pvc_name} - - name: model - persistentVolumeClaim: - claimName: {model_pvc_name} - - name: output - persistentVolumeClaim: - claimName: {output_pvc_name} - """ + echo "Running phase {phase_num}" + echo "Using {path_to_model} model for training" + echo "Using {path_to_data} data for training" + mkdir -p /tmp/model; + torchrun --nnodes {nnodes} \ + --nproc_per_node {nproc_per_node} \ + --node_rank \$(RANK) \ + --rdzv_endpoint \$(MASTER_ADDR):\$(MASTER_PORT) \ + -m instructlab.training.main_ds \ + --model_name_or_path={path_to_model} \ + --data_path={path_to_data} \ + --output_dir=/tmp/model \ + --num_epochs={num_epochs} \ + --effective_batch_size={effective_batch_size} \ + --learning_rate={learning_rate} \ + --num_warmup_steps={num_warmup_steps} \ + --save_samples={save_samples} \ + --log_level=INFO \ + --max_batch_len={max_batch_len} \ + --seed={seed} \ + --cpu_offload_optimizer \ + --cpu_offload_params_fsdp \ + --distributed_training_framework fsdp \ + --checkpoint_at_epoch + """ + ] + + # Set volumes + volumes = [ + models.V1Volume( + name="input-data", + persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource( + claim_name=input_pvc_name + ), + ), + models.V1Volume( + name="model", + persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource( + claim_name=model_pvc_name + ), + ), + models.V1Volume( + name="output", + persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource( + claim_name=output_pvc_name + ), + ), + ] + + # Set volume mounts + volume_mounts_master = [ + models.V1VolumeMount( + mount_path="/input_data", name="input-data", read_only=True + ), + models.V1VolumeMount(mount_path="/input_model", name="model", read_only=True), + models.V1VolumeMount(mount_path="/output", name="output"), + ] + + volume_mounts_worker = [ + models.V1VolumeMount( + mount_path="/input_data", name="input-data", read_only=True + ), + models.V1VolumeMount(mount_path="/input_model", name="model", read_only=True), + models.V1VolumeMount(mount_path="/output", name="output", read_only=True), + ] + + # Set env variables + env_vars = [ + models.V1EnvVar(name="NNODES", value=f"{nnodes}"), + models.V1EnvVar(name="NPROC_PER_NODE", value=f"{nproc_per_node}"), + models.V1EnvVar(name="XDG_CACHE_HOME", value="/tmp"), + models.V1EnvVar(name="TRITON_CACHE_DIR", value="/tmp"), + models.V1EnvVar(name="HF_HOME", value="/tmp"), + models.V1EnvVar(name="TRANSFORMERS_CACHE", value="/tmp"), + ] + + # Get master and worker container specs + master_container_spec = kfto_utils.get_container_spec( + base_image=base_image, + name="pytorch", + resources=resources_per_worker, + volume_mounts=volume_mounts_master, ) - try: - manifest_yaml = yaml.safe_load(manifest) - except yaml.YAMLError as exc: - raise RuntimeError(f"Error parsing manifest: {exc}") from exc - - # Discover the namespace in which the pod is running - with open( - "/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r", encoding="utf-8" - ) as f: - namespace = f.read().strip() - print(f"The pod is running in the namespace: {namespace}") - - try: - kubernetes.config.load_kube_config() - print("Loaded kube config") - except kubernetes.config.ConfigException: - print("Failed to load kube config. Trying in-cluster config") - kubernetes.config.load_incluster_config() - - api = kubernetes.client.CustomObjectsApi() - try: - api.create_namespaced_custom_object( - group="kubeflow.org", - version="v1", - namespace=namespace, - plural="pytorchjobs", - body=manifest_yaml, - ) - except kubernetes.client.rest.ApiException as exc: - if exc.status == 409: - print( - "{} '{}/{}' already exists.".format( - manifest_yaml["kind"], - namespace, - manifest_yaml["metadata"]["name"], - ) - ) - else: - raise - - # Get the CR status and wait for it to be completed - w = kubernetes.watch.Watch() - exit_flag = False - start_time = time.time() - timeout_seconds = 24 * 60 * 60 # 24 hours - - while not exit_flag: # Keep the watch active - if time.time() - start_time > timeout_seconds: - raise RuntimeError( - "Timeout (24h) reached waiting for the PytorchJob to complete." - ) + # In the next release of kubeflow-training, the command + # and the args will be a part of kfto_utils.get_container_spec function + master_container_spec.command = command + master_container_spec.args = master_args + + master_container_spec.env = env_vars + + worker_container_spec = kfto_utils.get_container_spec( + base_image=base_image, + name="pytorch", + resources=resources_per_worker, + volume_mounts=volume_mounts_worker, + ) + worker_container_spec.command = command + worker_container_spec.args = worker_args + worker_container_spec.env = env_vars + + # create master pod spec + master_pod_template_spec = kfto_utils.get_pod_template_spec( + containers=[master_container_spec], + volumes=volumes, + ) + + # create worker pod spec + worker_pod_template_spec = kfto_utils.get_pod_template_spec( + containers=[worker_container_spec], + volumes=volumes, + ) + + logging.getLogger(__name__).setLevel(logging.INFO) + logging.info("Generating job template.") + logging.info("Creating TrainingClient.") + + # Initialize training client + # This also finds the namespace from /var/run/secrets/kubernetes.io/serviceaccount/namespace + # And it also loads the kube config + training_client = TrainingClient() + namespace = training_client.namespace + + # Create pytorch job spec + job_template = kfto_utils.get_pytorchjob_template( + name=name, + namespace=namespace, + worker_pod_template_spec=worker_pod_template_spec, + master_pod_template_spec=master_pod_template_spec, + num_workers=nnodes, + num_procs_per_worker=nproc_per_node, + ) + # Save the pytorch job yaml for record keeping and debugging + with open(pytorchjob_output_yaml.path, "w", encoding="utf-8") as f: + f.write(job_template.to_str()) + + # Run the pytorch job + logging.info(f"Creating PyTorchJob in namespace: {namespace}") + training_client.create_job(job_template, namespace=namespace) + + expected_conditions = ["Succeeded", "Failed"] + logging.info(f"Monitoring job until status is any of {expected_conditions}.") + + def get_logs(job): + _, _ = training_client.get_job_logs(name=job.metadata.name, follow=True) + + training_client.wait_for_job_conditions( + name=name, + expected_conditions=set(expected_conditions), + wait_timeout=job_timeout, + timeout=job_timeout, + callback=get_logs, + ) - try: - print("Watching for PytorchJob") - for event in w.stream( - api.list_namespaced_custom_object, - group="kubeflow.org", - version="v1", - namespace=namespace, - plural="pytorchjobs", - timeout_seconds=60, # Timeout after 1 minute - ): - pytorchjob_event = event["object"] - if ( - pytorchjob_event["metadata"]["name"] - != manifest_yaml["metadata"]["name"] - ): - continue - pytorchjob_name = pytorchjob_event["metadata"]["name"] - - if ( - "status" not in pytorchjob_event - or "conditions" not in pytorchjob_event["status"] - ): - continue - print( - f"PytorchJob: {pytorchjob_name} - {pytorchjob_event['status'].get('conditions', 'No conditions yet')}" - ) - for job_condition in reversed(pytorchjob_event["status"]["conditions"]): - if job_condition["type"] == "Succeeded": - print( - f"PytorchJob '{pytorchjob_name}' completed successfully: {job_condition['reason']}" - ) - print(f"Training phase {phase_num} completed.") - w.stop() - exit_flag = True - # Break here to avoid going into other conditions, we are done - break - elif job_condition["type"] == "Failed": - print( - f"PytorchJob '{pytorchjob_name}' failed: {job_condition['reason']}" - ) - w.stop() - raise RuntimeError("Job failed.") - except kubernetes.client.exceptions.ApiException as e: - print(f"API exception occurred: {str(e)}") - time.sleep(5) # Backoff before retrying - # Catches the following error: - # urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength - except urllib3.exceptions.ProtocolError as e: - print(f"Connection broken reconnecting the watcher {str(e)}") - time.sleep(5) # Backoff before retrying - finally: - w.stop() + if delete_after_done: + logging.info("Deleting job after completion.") + training_client.delete_job(name, namespace) diff --git a/utils/consts.py b/utils/consts.py index 0087f2d4..04f271c7 100644 --- a/utils/consts.py +++ b/utils/consts.py @@ -1,4 +1,4 @@ -PYTHON_IMAGE = "quay.io/modh/odh-generic-data-science-notebook@sha256:0efbb3ad6f8f342360cf1f002d40716a39d4c58f69163e053d5bd19b4fe732d4" # v3-2024b-20250115 +PYTHON_IMAGE = "quay.io/modh/odh-generic-data-science-notebook@sha256:72c1d095adbda216a1f1b4b6935e3e2c717cbc58964009464ccd36c0b98312b2" # v3-20250116 TOOLBOX_IMAGE = "registry.redhat.io/ubi9/toolbox@sha256:da31dee8904a535d12689346e65e5b00d11a6179abf1fa69b548dbd755fa2770" # v9.5 OC_IMAGE = "registry.redhat.io/openshift4/ose-cli@sha256:08bdbfae224dd39c81689ee73c183619d6b41eba7ac04f0dce7ee79f50531d0b" # v4.15.0 RHELAI_IMAGE = "registry.redhat.io/rhelai1/instructlab-nvidia-rhel9@sha256:05cfba1fb13ed54b1de4d021da2a31dd78ba7d8cc48e10c7fe372815899a18ae" # v1.3.2