diff --git a/pipeline.py b/pipeline.py index aeefc6c8..c5435da2 100644 --- a/pipeline.py +++ b/pipeline.py @@ -27,7 +27,7 @@ from training import ( data_processing_op, knowledge_processed_data_to_artifact_op, - pytorchjob_manifest_op, + pytorch_job_launcher_op, skills_processed_data_to_artifact_op, ) from utils import ( @@ -36,6 +36,7 @@ pvc_to_model_op, pvc_to_mt_bench_op, ) +from utils.consts import RHELAI_IMAGE TEACHER_CONFIG_MAP = "teacher-server" TEACHER_SECRET = "teacher-server" @@ -264,12 +265,13 @@ def ilab_pipeline( # Training 1 # Using pvc_create_task.output as PyTorchJob name since dsl.PIPELINE_* global variables do not template/work in KFP v2 # https://github.com/kubeflow/pipelines/issues/10453 - training_phase_1 = pytorchjob_manifest_op( + training_phase_1 = pytorch_job_launcher_op( model_pvc_name=model_pvc_task.output, input_pvc_name=sdg_input_pvc_task.output, name_suffix=sdg_input_pvc_task.output, output_pvc_name=output_pvc_task.output, phase_num=1, + base_image=RHELAI_IMAGE, nproc_per_node=train_nproc_per_node, nnodes=train_nnodes, num_epochs=train_num_epochs_phase_1, @@ -284,12 +286,13 @@ def ilab_pipeline( training_phase_1.set_caching_options(False) #### Train 2 - training_phase_2 = pytorchjob_manifest_op( + training_phase_2 = pytorch_job_launcher_op( model_pvc_name=model_pvc_task.output, input_pvc_name=sdg_input_pvc_task.output, name_suffix=sdg_input_pvc_task.output, output_pvc_name=output_pvc_task.output, phase_num=2, + base_image=RHELAI_IMAGE, nproc_per_node=train_nproc_per_node, nnodes=train_nnodes, num_epochs=train_num_epochs_phase_2, diff --git a/pipeline.yaml b/pipeline.yaml index e00d8619..e3d77635 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -343,16 +343,26 @@ components: artifactType: schemaTitle: system.Artifact schemaVersion: 0.0.1 - comp-pytorchjob-manifest-op: - executorLabel: exec-pytorchjob-manifest-op + comp-pytorch-job-launcher-op: + executorLabel: exec-pytorch-job-launcher-op inputDefinitions: parameters: + base_image: + parameterType: STRING + delete_after_done: + defaultValue: false + isOptional: true + parameterType: BOOLEAN effective_batch_size: defaultValue: 3840.0 isOptional: true parameterType: NUMBER_INTEGER input_pvc_name: parameterType: STRING + job_timeout: + defaultValue: 86400.0 + isOptional: true + parameterType: NUMBER_INTEGER learning_rate: defaultValue: 0.0001 isOptional: true @@ -393,16 +403,32 @@ components: defaultValue: 42.0 isOptional: true parameterType: NUMBER_INTEGER - comp-pytorchjob-manifest-op-2: - executorLabel: exec-pytorchjob-manifest-op-2 + outputDefinitions: + artifacts: + pytorchjob_output_yaml: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + comp-pytorch-job-launcher-op-2: + executorLabel: exec-pytorch-job-launcher-op-2 inputDefinitions: parameters: + base_image: + parameterType: STRING + delete_after_done: + defaultValue: false + isOptional: true + parameterType: BOOLEAN effective_batch_size: defaultValue: 3840.0 isOptional: true parameterType: NUMBER_INTEGER input_pvc_name: parameterType: STRING + job_timeout: + defaultValue: 86400.0 + isOptional: true + parameterType: NUMBER_INTEGER learning_rate: defaultValue: 0.0001 isOptional: true @@ -443,6 +469,12 @@ components: defaultValue: 42.0 isOptional: true parameterType: NUMBER_INTEGER + outputDefinitions: + artifacts: + pytorchjob_output_yaml: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 comp-run-final-eval-op: executorLabel: exec-run-final-eval-op inputDefinitions: @@ -711,13 +743,13 @@ deploymentSpec: - /bin/sh - -c image: registry.redhat.io/ubi9/toolbox@sha256:da31dee8904a535d12689346e65e5b00d11a6179abf1fa69b548dbd755fa2770 - exec-pytorchjob-manifest-op: + exec-pytorch-job-launcher-op: container: args: - --executor_input - '{{$}}' - --function_to_execute - - pytorchjob_manifest_op + - pytorch_job_launcher_op command: - sh - -ec @@ -730,198 +762,124 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef pytorchjob_manifest_op(\n model_pvc_name: str,\n input_pvc_name:\ - \ str,\n output_pvc_name: str,\n name_suffix: str,\n # path_to_model:\ - \ str,\n phase_num: int,\n nproc_per_node: int = 3,\n nnodes: int\ - \ = 2,\n num_epochs: int = 2,\n effective_batch_size: int = 3840,\n\ - \ learning_rate: float = 1e-4,\n num_warmup_steps: int = 800,\n \ - \ save_samples: int = 0,\n max_batch_len: int = 20000,\n seed: int\ - \ = 42,\n):\n import inspect\n import os\n import time\n\n import\ - \ kubernetes\n import urllib3\n import yaml\n\n def list_phase1_final_model():\n\ - \ model_dir = \"/output/phase_1/model/hf_format\"\n models\ - \ = os.listdir(model_dir)\n newest_idx = max(\n (os.path.getmtime(f\"\ - {model_dir}/{model}\"), i)\n for i, model in enumerate(models)\n\ - \ )[-1]\n newest_model = models[newest_idx]\n return\ - \ f\"{model_dir}/{newest_model}\"\n\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\ + \ *\n\ndef pytorch_job_launcher_op(\n pytorchjob_output_yaml: dsl.Output[dsl.Artifact],\n\ + \ model_pvc_name: str,\n input_pvc_name: str,\n output_pvc_name:\ + \ str,\n name_suffix: str,\n phase_num: int,\n base_image: str,\n\ + \ nproc_per_node: int = 3,\n nnodes: int = 2,\n num_epochs: int\ + \ = 2,\n effective_batch_size: int = 3840,\n learning_rate: float\ + \ = 1e-4,\n num_warmup_steps: int = 800,\n save_samples: int = 0,\n\ + \ max_batch_len: int = 20000,\n seed: int = 42,\n job_timeout:\ + \ int = 86400,\n delete_after_done: bool = False,\n):\n import logging\n\ + \ import os\n\n from kubeflow.training import TrainingClient, models\n\ + \ from kubeflow.training.utils import utils as kfto_utils\n\n def\ + \ list_phase1_final_model():\n model_dir = \"/output/phase_1/model/hf_format\"\ + \n model_list = os.listdir(model_dir)\n newest_idx = max(\n\ + \ (os.path.getmtime(f\"{model_dir}/{model}\"), i)\n \ + \ for i, model in enumerate(model_list)\n )[-1]\n newest_model\ + \ = model_list[newest_idx]\n return f\"{model_dir}/{newest_model}\"\ \n\n if phase_num == 1:\n path_to_model = \"/input_model\"\n \ \ path_to_data = \"/input_data/knowledge/data.jsonl\"\n elif phase_num\ \ == 2:\n path_to_model = list_phase1_final_model()\n path_to_data\ \ = \"/input_data/skills/data.jsonl\"\n else:\n raise RuntimeError(f\"\ - Unsupported value of {phase_num=}\")\n\n image = \"registry.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1\"\ - \n\n manifest = inspect.cleandoc(\n f\"\"\"\n apiVersion:\ - \ kubeflow.org/v1\n kind: PyTorchJob\n metadata:\n \ - \ name: {name}\n spec:\n nprocPerNode: \\\"{nproc_per_node}\\\ - \"\n pytorchReplicaSpecs:\n Master:\n replicas:\ - \ 1\n restartPolicy: OnFailure\n template:\n \ - \ metadata:\n annotations:\n \ - \ sidecar.istio.io/inject: 'false'\n spec:\n \ - \ containers:\n - args:\n \ - \ - |\n echo \"Running phase {phase_num}\"\ - \n echo \"Using {path_to_model} model for training\"\ - \n echo \"Using {path_to_data} data for training\"\ - \n mkdir -p /output/phase_{phase_num}/model;\n\ - \ mkdir -p /output/data;\n \ - \ torchrun --nnodes {nnodes} \\\n --nproc_per_node\ - \ {nproc_per_node} \\\n --node_rank \\$(RANK)\ - \ \\\n --rdzv_endpoint \\$(MASTER_ADDR):\\\ - $(MASTER_PORT) \\\n -m instructlab.training.main_ds\ - \ \\\n --model_name_or_path={path_to_model}\ - \ \\\n --data_path={path_to_data} \\\n \ - \ --output_dir=/output/phase_{phase_num}/model\ - \ \\\n --num_epochs={num_epochs} \\\n \ - \ --effective_batch_size={effective_batch_size}\ - \ \\\n --learning_rate={learning_rate} \\\n\ - \ --num_warmup_steps={num_warmup_steps} \\\n\ - \ --save_samples={save_samples} \\\n \ - \ --log_level=INFO \\\n \ - \ --max_batch_len={max_batch_len} \\\n \ - \ --seed={seed} \\\n --cpu_offload_optimizer\ - \ \\\n --cpu_offload_params_fsdp \\\n \ - \ --distributed_training_framework fsdp \\\n \ - \ --checkpoint_at_epoch\n \ - \ command:\n - /bin/bash\n \ - \ - '-c'\n - '--'\n image:\ - \ {image}\n name: pytorch\n volumeMounts:\n\ - \ - mountPath: /input_data\n \ - \ name: input-data\n readOnly: true\n \ - \ - mountPath: /input_model\n \ - \ name: model\n readOnly: true\n \ - \ - mountPath: /output\n name: output\n\ - \ env:\n - name: NNODES\n \ - \ value: \\\"{nnodes}\\\"\n \ - \ - name: NPROC_PER_NODE\n value: \\\"{nproc_per_node}\\\ - \"\n - name: XDG_CACHE_HOME\n \ - \ value: /tmp\n - name: TRITON_CACHE_DIR\n\ - \ value: /tmp\n - name:\ - \ HF_HOME\n value: /tmp\n \ - \ - name: TRANSFORMERS_CACHE\n value: /tmp\n\ - \ resources:\n requests:\n \ - \ \"nvidia.com/gpu\": {nproc_per_node}\n \ - \ limits:\n \"nvidia.com/gpu\"\ - : {nproc_per_node}\n volumes:\n - name:\ - \ input-data\n persistentVolumeClaim:\n \ - \ claimName: {input_pvc_name}\n - name: model\n\ - \ persistentVolumeClaim:\n claimName:\ - \ {model_pvc_name}\n - name: output\n \ - \ persistentVolumeClaim:\n claimName: {output_pvc_name}\n\ - \ Worker:\n replicas: {nnodes-1}\n \ - \ restartPolicy: OnFailure\n template:\n metadata:\n\ - \ annotations:\n sidecar.istio.io/inject:\ - \ 'false'\n spec:\n containers:\n \ - \ - args:\n - |\n \ - \ echo \"Running phase {phase_num}\"\n echo\ - \ \"Using {path_to_model} model for training\"\n \ - \ echo \"Using {path_to_data} data for training\"\n \ - \ mkdir -p /tmp/model;\n torchrun --nnodes\ - \ {nnodes} \\\n --nproc_per_node {nproc_per_node}\ - \ \\\n --node_rank \\$(RANK) \\\n \ - \ --rdzv_endpoint \\$(MASTER_ADDR):\\$(MASTER_PORT) \\\n\ - \ -m instructlab.training.main_ds \\\n \ - \ --model_name_or_path={path_to_model} \\\n \ - \ --data_path={path_to_data} \\\n \ - \ --output_dir=/tmp/model \\\n --num_epochs={num_epochs}\ - \ \\\n --effective_batch_size={effective_batch_size}\ - \ \\\n --learning_rate={learning_rate} \\\n \ - \ --num_warmup_steps={num_warmup_steps} \\\n \ - \ --save_samples={save_samples} \\\n \ - \ --log_level=INFO \\\n --max_batch_len={max_batch_len}\ - \ \\\n --seed={seed} \\\n \ - \ --cpu_offload_optimizer \\\n --cpu_offload_params_fsdp\ - \ \\\n --distributed_training_framework fsdp\ - \ \\\n --checkpoint_at_epoch\n \ - \ command:\n - /bin/bash\n \ - \ - '-c'\n - '--'\n \ - \ image: {image}\n name: pytorch\n \ - \ volumeMounts:\n - mountPath: /input_data\n\ - \ name: input-data\n readOnly:\ - \ true\n - mountPath: /input_model\n \ - \ name: model\n readOnly: true\n \ - \ - mountPath: /output\n \ - \ name: output\n readOnly: true\n \ - \ env:\n - name: NNODES\n \ - \ value: \\\"{nnodes}\\\"\n - name: NPROC_PER_NODE\n\ - \ value: \\\"{nproc_per_node}\\\"\n \ - \ - name: XDG_CACHE_HOME\n value: /tmp\n\ - \ - name: TRITON_CACHE_DIR\n \ - \ value: /tmp\n - name: HF_HOME\n \ - \ value: /tmp\n - name: TRANSFORMERS_CACHE\n\ - \ value: /tmp\n resources:\n\ - \ requests:\n \"nvidia.com/gpu\"\ - : {nproc_per_node}\n limits:\n \ - \ \"nvidia.com/gpu\": {nproc_per_node}\n volumes:\n\ - \ - name: input-data\n persistentVolumeClaim:\n\ - \ claimName: {input_pvc_name}\n \ - \ - name: model\n persistentVolumeClaim:\n \ - \ claimName: {model_pvc_name}\n - name:\ - \ output\n persistentVolumeClaim:\n \ - \ claimName: {output_pvc_name}\n \"\"\"\n )\n\n try:\n\ - \ manifest_yaml = yaml.safe_load(manifest)\n except yaml.YAMLError\ - \ as exc:\n raise RuntimeError(f\"Error parsing manifest: {exc}\"\ - ) from exc\n\n # Discover the namespace in which the pod is running\n\ - \ with open(\n \"/var/run/secrets/kubernetes.io/serviceaccount/namespace\"\ - , \"r\", encoding=\"utf-8\"\n ) as f:\n namespace = f.read().strip()\n\ - \ print(f\"The pod is running in the namespace: {namespace}\")\n\n\ - \ try:\n kubernetes.config.load_kube_config()\n print(\"\ - Loaded kube config\")\n except kubernetes.config.ConfigException:\n \ - \ print(\"Failed to load kube config. Trying in-cluster config\")\n\ - \ kubernetes.config.load_incluster_config()\n\n api = kubernetes.client.CustomObjectsApi()\n\ - \ try:\n api.create_namespaced_custom_object(\n group=\"\ - kubeflow.org\",\n version=\"v1\",\n namespace=namespace,\n\ - \ plural=\"pytorchjobs\",\n body=manifest_yaml,\n\ - \ )\n except kubernetes.client.rest.ApiException as exc:\n \ - \ if exc.status == 409:\n print(\n \"{} '{}/{}'\ - \ already exists.\".format(\n manifest_yaml[\"kind\"\ - ],\n namespace,\n manifest_yaml[\"\ - metadata\"][\"name\"],\n )\n )\n else:\n\ - \ raise\n\n # Get the CR status and wait for it to be completed\n\ - \ w = kubernetes.watch.Watch()\n exit_flag = False\n start_time\ - \ = time.time()\n timeout_seconds = 24 * 60 * 60 # 24 hours\n\n while\ - \ not exit_flag: # Keep the watch active\n if time.time() - start_time\ - \ > timeout_seconds:\n raise RuntimeError(\n \"\ - Timeout (24h) reached waiting for the PytorchJob to complete.\"\n \ - \ )\n\n try:\n print(\"Watching for PytorchJob\"\ - )\n for event in w.stream(\n api.list_namespaced_custom_object,\n\ - \ group=\"kubeflow.org\",\n version=\"v1\"\ - ,\n namespace=namespace,\n plural=\"pytorchjobs\"\ - ,\n timeout_seconds=60, # Timeout after 1 minute\n \ - \ ):\n pytorchjob_event = event[\"object\"]\n \ - \ if (\n pytorchjob_event[\"metadata\"][\"\ - name\"]\n != manifest_yaml[\"metadata\"][\"name\"]\n\ - \ ):\n continue\n pytorchjob_name\ - \ = pytorchjob_event[\"metadata\"][\"name\"]\n\n if (\n \ - \ \"status\" not in pytorchjob_event\n \ - \ or \"conditions\" not in pytorchjob_event[\"status\"]\n \ - \ ):\n continue\n print(\n \ - \ f\"PytorchJob: {pytorchjob_name} - {pytorchjob_event['status'].get('conditions',\ - \ 'No conditions yet')}\"\n )\n for job_condition\ - \ in reversed(pytorchjob_event[\"status\"][\"conditions\"]):\n \ - \ if job_condition[\"type\"] == \"Succeeded\":\n \ - \ print(\n f\"PytorchJob '{pytorchjob_name}'\ - \ completed successfully: {job_condition['reason']}\"\n \ - \ )\n print(f\"Training phase {phase_num}\ - \ completed.\")\n w.stop()\n \ - \ exit_flag = True\n # Break here to avoid going\ - \ into other conditions, we are done\n break\n \ - \ elif job_condition[\"type\"] == \"Failed\":\n \ - \ print(\n f\"PytorchJob '{pytorchjob_name}'\ - \ failed: {job_condition['reason']}\"\n )\n \ - \ w.stop()\n raise RuntimeError(\"\ - Job failed.\")\n except kubernetes.client.exceptions.ApiException\ - \ as e:\n print(f\"API exception occurred: {str(e)}\")\n \ - \ time.sleep(5) # Backoff before retrying\n # Catches the\ - \ following error:\n # urllib3.exceptions.ProtocolError: (\"Connection\ - \ broken: InvalidChunkLength\n except urllib3.exceptions.ProtocolError\ - \ as e:\n print(f\"Connection broken reconnecting the watcher\ - \ {str(e)}\")\n time.sleep(5) # Backoff before retrying\n \ - \ finally:\n w.stop()\n\n" - image: quay.io/modh/odh-generic-data-science-notebook@sha256:0efbb3ad6f8f342360cf1f002d40716a39d4c58f69163e053d5bd19b4fe732d4 - exec-pytorchjob-manifest-op-2: + Unsupported value of {phase_num=}\")\n\n resources_per_worker = {\"nvidia.com/gpu\"\ + : nproc_per_node}\n\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\ + \n command = [\"/bin/sh\", \"-c\", \"--\"]\n\n master_args = [\n \ + \ f\"\"\"\n echo \"Running phase {phase_num}\"\n echo\ + \ \"Using {path_to_model} model for training\"\n echo \"Using {path_to_data}\ + \ data for training\"\n mkdir -p /output/phase_{phase_num}/model;\n\ + \ mkdir -p /output/data;\n torchrun --nnodes {nnodes} \\\n\ + \ --nproc_per_node {nproc_per_node} \\\n --node_rank\ + \ \\$(RANK) \\\n --rdzv_endpoint \\$(MASTER_ADDR):\\$(MASTER_PORT)\ + \ \\\n -m instructlab.training.main_ds \\\n --model_name_or_path={path_to_model}\ + \ \\\n --data_path={path_to_data} \\\n --output_dir=/output/phase_{phase_num}/model\ + \ \\\n --num_epochs={num_epochs} \\\n --effective_batch_size={effective_batch_size}\ + \ \\\n --learning_rate={learning_rate} \\\n --num_warmup_steps={num_warmup_steps}\ + \ \\\n --save_samples={save_samples} \\\n --log_level=INFO\ + \ \\\n --max_batch_len={max_batch_len} \\\n --seed={seed}\ + \ \\\n --cpu_offload_optimizer \\\n --cpu_offload_params_fsdp\ + \ \\\n --distributed_training_framework fsdp \\\n \ + \ --checkpoint_at_epoch\n \"\"\"\n ]\n\n worker_args =\ + \ [\n f\"\"\"\n echo \"Running phase {phase_num}\"\n \ + \ echo \"Using {path_to_model} model for training\"\n echo \"Using\ + \ {path_to_data} data for training\"\n mkdir -p /tmp/model;\n \ + \ torchrun --nnodes {nnodes} \\\n --nproc_per_node {nproc_per_node}\ + \ \\\n --node_rank \\$(RANK) \\\n --rdzv_endpoint \\$(MASTER_ADDR):\\\ + $(MASTER_PORT) \\\n -m instructlab.training.main_ds \\\n \ + \ --model_name_or_path={path_to_model} \\\n --data_path={path_to_data}\ + \ \\\n --output_dir=/tmp/model \\\n --num_epochs={num_epochs}\ + \ \\\n --effective_batch_size={effective_batch_size} \\\n \ + \ --learning_rate={learning_rate} \\\n --num_warmup_steps={num_warmup_steps}\ + \ \\\n --save_samples={save_samples} \\\n --log_level=INFO\ + \ \\\n --max_batch_len={max_batch_len} \\\n --seed={seed}\ + \ \\\n --cpu_offload_optimizer \\\n --cpu_offload_params_fsdp\ + \ \\\n --distributed_training_framework fsdp \\\n --checkpoint_at_epoch\n\ + \ \"\"\"\n ]\n\n # Set volumes\n volumes = [\n \ + \ models.V1Volume(\n name=\"input-data\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ + \ claim_name=input_pvc_name\n ),\n ),\n\ + \ models.V1Volume(\n name=\"model\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ + \ claim_name=model_pvc_name\n ),\n ),\n\ + \ models.V1Volume(\n name=\"output\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ + \ claim_name=output_pvc_name\n ),\n ),\n\ + \ ]\n\n # Set volume mounts\n volume_mounts_master = [\n \ + \ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\ + input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\ + /input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\ + /output\", name=\"output\"),\n ]\n\n volume_mounts_worker = [\n \ + \ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\ + input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\ + /input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\ + /output\", name=\"output\", read_only=True),\n ]\n\n # Set env variables\n\ + \ env_vars = [\n models.V1EnvVar(name=\"NNODES\", value=f\"{nnodes}\"\ + ),\n models.V1EnvVar(name=\"NPROC_PER_NODE\", value=f\"{nproc_per_node}\"\ + ),\n models.V1EnvVar(name=\"XDG_CACHE_HOME\", value=\"/tmp\"),\n\ + \ models.V1EnvVar(name=\"TRITON_CACHE_DIR\", value=\"/tmp\"),\n \ + \ models.V1EnvVar(name=\"HF_HOME\", value=\"/tmp\"),\n models.V1EnvVar(name=\"\ + TRANSFORMERS_CACHE\", value=\"/tmp\"),\n ]\n\n # Get master and worker\ + \ container specs\n master_container_spec = kfto_utils.get_container_spec(\n\ + \ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\ + \ volume_mounts=volume_mounts_master,\n )\n\n # In the next\ + \ release of kubeflow-training, the command\n # and the args will be\ + \ a part of kfto_utils.get_container_spec function\n master_container_spec.command\ + \ = command\n master_container_spec.args = master_args\n\n master_container_spec.env\ + \ = env_vars\n\n worker_container_spec = kfto_utils.get_container_spec(\n\ + \ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\ + \ volume_mounts=volume_mounts_worker,\n )\n worker_container_spec.command\ + \ = command\n worker_container_spec.args = worker_args\n worker_container_spec.env\ + \ = env_vars\n\n # create master pod spec\n master_pod_template_spec\ + \ = kfto_utils.get_pod_template_spec(\n containers=[master_container_spec],\n\ + \ volumes=volumes,\n )\n\n # create worker pod spec\n worker_pod_template_spec\ + \ = kfto_utils.get_pod_template_spec(\n containers=[worker_container_spec],\n\ + \ volumes=volumes,\n )\n\n logging.getLogger(__name__).setLevel(logging.INFO)\n\ + \ logging.info(\"Generating job template.\")\n logging.info(\"Creating\ + \ TrainingClient.\")\n\n # Initialize training client\n # This also\ + \ finds the namespace from /var/run/secrets/kubernetes.io/serviceaccount/namespace\n\ + \ # And it also loads the kube config\n training_client = TrainingClient()\n\ + \ namespace = training_client.namespace\n\n # Create pytorch job spec\n\ + \ job_template = kfto_utils.get_pytorchjob_template(\n name=name,\n\ + \ namespace=namespace,\n worker_pod_template_spec=worker_pod_template_spec,\n\ + \ master_pod_template_spec=master_pod_template_spec,\n num_workers=nnodes,\n\ + \ num_procs_per_worker=nproc_per_node,\n )\n # Save the pytorch\ + \ job yaml for record keeping and debugging\n with open(pytorchjob_output_yaml.path,\ + \ \"w\", encoding=\"utf-8\") as f:\n f.write(job_template.to_str())\n\ + \n # Run the pytorch job\n logging.info(f\"Creating PyTorchJob in\ + \ namespace: {namespace}\")\n training_client.create_job(job_template,\ + \ namespace=namespace)\n\n expected_conditions = [\"Succeeded\", \"Failed\"\ + ]\n logging.info(f\"Monitoring job until status is any of {expected_conditions}.\"\ + )\n\n def get_logs(job):\n _, _ = training_client.get_job_logs(name=job.metadata.name,\ + \ follow=True)\n\n training_client.wait_for_job_conditions(\n \ + \ name=name,\n expected_conditions=set(expected_conditions),\n \ + \ wait_timeout=job_timeout,\n timeout=job_timeout,\n \ + \ callback=get_logs,\n )\n\n if delete_after_done:\n logging.info(\"\ + Deleting job after completion.\")\n training_client.delete_job(name,\ + \ namespace)\n\n" + image: quay.io/modh/odh-generic-data-science-notebook@sha256:72c1d095adbda216a1f1b4b6935e3e2c717cbc58964009464ccd36c0b98312b2 + exec-pytorch-job-launcher-op-2: container: args: - --executor_input - '{{$}}' - --function_to_execute - - pytorchjob_manifest_op + - pytorch_job_launcher_op command: - sh - -ec @@ -934,191 +892,117 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef pytorchjob_manifest_op(\n model_pvc_name: str,\n input_pvc_name:\ - \ str,\n output_pvc_name: str,\n name_suffix: str,\n # path_to_model:\ - \ str,\n phase_num: int,\n nproc_per_node: int = 3,\n nnodes: int\ - \ = 2,\n num_epochs: int = 2,\n effective_batch_size: int = 3840,\n\ - \ learning_rate: float = 1e-4,\n num_warmup_steps: int = 800,\n \ - \ save_samples: int = 0,\n max_batch_len: int = 20000,\n seed: int\ - \ = 42,\n):\n import inspect\n import os\n import time\n\n import\ - \ kubernetes\n import urllib3\n import yaml\n\n def list_phase1_final_model():\n\ - \ model_dir = \"/output/phase_1/model/hf_format\"\n models\ - \ = os.listdir(model_dir)\n newest_idx = max(\n (os.path.getmtime(f\"\ - {model_dir}/{model}\"), i)\n for i, model in enumerate(models)\n\ - \ )[-1]\n newest_model = models[newest_idx]\n return\ - \ f\"{model_dir}/{newest_model}\"\n\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\ + \ *\n\ndef pytorch_job_launcher_op(\n pytorchjob_output_yaml: dsl.Output[dsl.Artifact],\n\ + \ model_pvc_name: str,\n input_pvc_name: str,\n output_pvc_name:\ + \ str,\n name_suffix: str,\n phase_num: int,\n base_image: str,\n\ + \ nproc_per_node: int = 3,\n nnodes: int = 2,\n num_epochs: int\ + \ = 2,\n effective_batch_size: int = 3840,\n learning_rate: float\ + \ = 1e-4,\n num_warmup_steps: int = 800,\n save_samples: int = 0,\n\ + \ max_batch_len: int = 20000,\n seed: int = 42,\n job_timeout:\ + \ int = 86400,\n delete_after_done: bool = False,\n):\n import logging\n\ + \ import os\n\n from kubeflow.training import TrainingClient, models\n\ + \ from kubeflow.training.utils import utils as kfto_utils\n\n def\ + \ list_phase1_final_model():\n model_dir = \"/output/phase_1/model/hf_format\"\ + \n model_list = os.listdir(model_dir)\n newest_idx = max(\n\ + \ (os.path.getmtime(f\"{model_dir}/{model}\"), i)\n \ + \ for i, model in enumerate(model_list)\n )[-1]\n newest_model\ + \ = model_list[newest_idx]\n return f\"{model_dir}/{newest_model}\"\ \n\n if phase_num == 1:\n path_to_model = \"/input_model\"\n \ \ path_to_data = \"/input_data/knowledge/data.jsonl\"\n elif phase_num\ \ == 2:\n path_to_model = list_phase1_final_model()\n path_to_data\ \ = \"/input_data/skills/data.jsonl\"\n else:\n raise RuntimeError(f\"\ - Unsupported value of {phase_num=}\")\n\n image = \"registry.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1\"\ - \n\n manifest = inspect.cleandoc(\n f\"\"\"\n apiVersion:\ - \ kubeflow.org/v1\n kind: PyTorchJob\n metadata:\n \ - \ name: {name}\n spec:\n nprocPerNode: \\\"{nproc_per_node}\\\ - \"\n pytorchReplicaSpecs:\n Master:\n replicas:\ - \ 1\n restartPolicy: OnFailure\n template:\n \ - \ metadata:\n annotations:\n \ - \ sidecar.istio.io/inject: 'false'\n spec:\n \ - \ containers:\n - args:\n \ - \ - |\n echo \"Running phase {phase_num}\"\ - \n echo \"Using {path_to_model} model for training\"\ - \n echo \"Using {path_to_data} data for training\"\ - \n mkdir -p /output/phase_{phase_num}/model;\n\ - \ mkdir -p /output/data;\n \ - \ torchrun --nnodes {nnodes} \\\n --nproc_per_node\ - \ {nproc_per_node} \\\n --node_rank \\$(RANK)\ - \ \\\n --rdzv_endpoint \\$(MASTER_ADDR):\\\ - $(MASTER_PORT) \\\n -m instructlab.training.main_ds\ - \ \\\n --model_name_or_path={path_to_model}\ - \ \\\n --data_path={path_to_data} \\\n \ - \ --output_dir=/output/phase_{phase_num}/model\ - \ \\\n --num_epochs={num_epochs} \\\n \ - \ --effective_batch_size={effective_batch_size}\ - \ \\\n --learning_rate={learning_rate} \\\n\ - \ --num_warmup_steps={num_warmup_steps} \\\n\ - \ --save_samples={save_samples} \\\n \ - \ --log_level=INFO \\\n \ - \ --max_batch_len={max_batch_len} \\\n \ - \ --seed={seed} \\\n --cpu_offload_optimizer\ - \ \\\n --cpu_offload_params_fsdp \\\n \ - \ --distributed_training_framework fsdp \\\n \ - \ --checkpoint_at_epoch\n \ - \ command:\n - /bin/bash\n \ - \ - '-c'\n - '--'\n image:\ - \ {image}\n name: pytorch\n volumeMounts:\n\ - \ - mountPath: /input_data\n \ - \ name: input-data\n readOnly: true\n \ - \ - mountPath: /input_model\n \ - \ name: model\n readOnly: true\n \ - \ - mountPath: /output\n name: output\n\ - \ env:\n - name: NNODES\n \ - \ value: \\\"{nnodes}\\\"\n \ - \ - name: NPROC_PER_NODE\n value: \\\"{nproc_per_node}\\\ - \"\n - name: XDG_CACHE_HOME\n \ - \ value: /tmp\n - name: TRITON_CACHE_DIR\n\ - \ value: /tmp\n - name:\ - \ HF_HOME\n value: /tmp\n \ - \ - name: TRANSFORMERS_CACHE\n value: /tmp\n\ - \ resources:\n requests:\n \ - \ \"nvidia.com/gpu\": {nproc_per_node}\n \ - \ limits:\n \"nvidia.com/gpu\"\ - : {nproc_per_node}\n volumes:\n - name:\ - \ input-data\n persistentVolumeClaim:\n \ - \ claimName: {input_pvc_name}\n - name: model\n\ - \ persistentVolumeClaim:\n claimName:\ - \ {model_pvc_name}\n - name: output\n \ - \ persistentVolumeClaim:\n claimName: {output_pvc_name}\n\ - \ Worker:\n replicas: {nnodes-1}\n \ - \ restartPolicy: OnFailure\n template:\n metadata:\n\ - \ annotations:\n sidecar.istio.io/inject:\ - \ 'false'\n spec:\n containers:\n \ - \ - args:\n - |\n \ - \ echo \"Running phase {phase_num}\"\n echo\ - \ \"Using {path_to_model} model for training\"\n \ - \ echo \"Using {path_to_data} data for training\"\n \ - \ mkdir -p /tmp/model;\n torchrun --nnodes\ - \ {nnodes} \\\n --nproc_per_node {nproc_per_node}\ - \ \\\n --node_rank \\$(RANK) \\\n \ - \ --rdzv_endpoint \\$(MASTER_ADDR):\\$(MASTER_PORT) \\\n\ - \ -m instructlab.training.main_ds \\\n \ - \ --model_name_or_path={path_to_model} \\\n \ - \ --data_path={path_to_data} \\\n \ - \ --output_dir=/tmp/model \\\n --num_epochs={num_epochs}\ - \ \\\n --effective_batch_size={effective_batch_size}\ - \ \\\n --learning_rate={learning_rate} \\\n \ - \ --num_warmup_steps={num_warmup_steps} \\\n \ - \ --save_samples={save_samples} \\\n \ - \ --log_level=INFO \\\n --max_batch_len={max_batch_len}\ - \ \\\n --seed={seed} \\\n \ - \ --cpu_offload_optimizer \\\n --cpu_offload_params_fsdp\ - \ \\\n --distributed_training_framework fsdp\ - \ \\\n --checkpoint_at_epoch\n \ - \ command:\n - /bin/bash\n \ - \ - '-c'\n - '--'\n \ - \ image: {image}\n name: pytorch\n \ - \ volumeMounts:\n - mountPath: /input_data\n\ - \ name: input-data\n readOnly:\ - \ true\n - mountPath: /input_model\n \ - \ name: model\n readOnly: true\n \ - \ - mountPath: /output\n \ - \ name: output\n readOnly: true\n \ - \ env:\n - name: NNODES\n \ - \ value: \\\"{nnodes}\\\"\n - name: NPROC_PER_NODE\n\ - \ value: \\\"{nproc_per_node}\\\"\n \ - \ - name: XDG_CACHE_HOME\n value: /tmp\n\ - \ - name: TRITON_CACHE_DIR\n \ - \ value: /tmp\n - name: HF_HOME\n \ - \ value: /tmp\n - name: TRANSFORMERS_CACHE\n\ - \ value: /tmp\n resources:\n\ - \ requests:\n \"nvidia.com/gpu\"\ - : {nproc_per_node}\n limits:\n \ - \ \"nvidia.com/gpu\": {nproc_per_node}\n volumes:\n\ - \ - name: input-data\n persistentVolumeClaim:\n\ - \ claimName: {input_pvc_name}\n \ - \ - name: model\n persistentVolumeClaim:\n \ - \ claimName: {model_pvc_name}\n - name:\ - \ output\n persistentVolumeClaim:\n \ - \ claimName: {output_pvc_name}\n \"\"\"\n )\n\n try:\n\ - \ manifest_yaml = yaml.safe_load(manifest)\n except yaml.YAMLError\ - \ as exc:\n raise RuntimeError(f\"Error parsing manifest: {exc}\"\ - ) from exc\n\n # Discover the namespace in which the pod is running\n\ - \ with open(\n \"/var/run/secrets/kubernetes.io/serviceaccount/namespace\"\ - , \"r\", encoding=\"utf-8\"\n ) as f:\n namespace = f.read().strip()\n\ - \ print(f\"The pod is running in the namespace: {namespace}\")\n\n\ - \ try:\n kubernetes.config.load_kube_config()\n print(\"\ - Loaded kube config\")\n except kubernetes.config.ConfigException:\n \ - \ print(\"Failed to load kube config. Trying in-cluster config\")\n\ - \ kubernetes.config.load_incluster_config()\n\n api = kubernetes.client.CustomObjectsApi()\n\ - \ try:\n api.create_namespaced_custom_object(\n group=\"\ - kubeflow.org\",\n version=\"v1\",\n namespace=namespace,\n\ - \ plural=\"pytorchjobs\",\n body=manifest_yaml,\n\ - \ )\n except kubernetes.client.rest.ApiException as exc:\n \ - \ if exc.status == 409:\n print(\n \"{} '{}/{}'\ - \ already exists.\".format(\n manifest_yaml[\"kind\"\ - ],\n namespace,\n manifest_yaml[\"\ - metadata\"][\"name\"],\n )\n )\n else:\n\ - \ raise\n\n # Get the CR status and wait for it to be completed\n\ - \ w = kubernetes.watch.Watch()\n exit_flag = False\n start_time\ - \ = time.time()\n timeout_seconds = 24 * 60 * 60 # 24 hours\n\n while\ - \ not exit_flag: # Keep the watch active\n if time.time() - start_time\ - \ > timeout_seconds:\n raise RuntimeError(\n \"\ - Timeout (24h) reached waiting for the PytorchJob to complete.\"\n \ - \ )\n\n try:\n print(\"Watching for PytorchJob\"\ - )\n for event in w.stream(\n api.list_namespaced_custom_object,\n\ - \ group=\"kubeflow.org\",\n version=\"v1\"\ - ,\n namespace=namespace,\n plural=\"pytorchjobs\"\ - ,\n timeout_seconds=60, # Timeout after 1 minute\n \ - \ ):\n pytorchjob_event = event[\"object\"]\n \ - \ if (\n pytorchjob_event[\"metadata\"][\"\ - name\"]\n != manifest_yaml[\"metadata\"][\"name\"]\n\ - \ ):\n continue\n pytorchjob_name\ - \ = pytorchjob_event[\"metadata\"][\"name\"]\n\n if (\n \ - \ \"status\" not in pytorchjob_event\n \ - \ or \"conditions\" not in pytorchjob_event[\"status\"]\n \ - \ ):\n continue\n print(\n \ - \ f\"PytorchJob: {pytorchjob_name} - {pytorchjob_event['status'].get('conditions',\ - \ 'No conditions yet')}\"\n )\n for job_condition\ - \ in reversed(pytorchjob_event[\"status\"][\"conditions\"]):\n \ - \ if job_condition[\"type\"] == \"Succeeded\":\n \ - \ print(\n f\"PytorchJob '{pytorchjob_name}'\ - \ completed successfully: {job_condition['reason']}\"\n \ - \ )\n print(f\"Training phase {phase_num}\ - \ completed.\")\n w.stop()\n \ - \ exit_flag = True\n # Break here to avoid going\ - \ into other conditions, we are done\n break\n \ - \ elif job_condition[\"type\"] == \"Failed\":\n \ - \ print(\n f\"PytorchJob '{pytorchjob_name}'\ - \ failed: {job_condition['reason']}\"\n )\n \ - \ w.stop()\n raise RuntimeError(\"\ - Job failed.\")\n except kubernetes.client.exceptions.ApiException\ - \ as e:\n print(f\"API exception occurred: {str(e)}\")\n \ - \ time.sleep(5) # Backoff before retrying\n # Catches the\ - \ following error:\n # urllib3.exceptions.ProtocolError: (\"Connection\ - \ broken: InvalidChunkLength\n except urllib3.exceptions.ProtocolError\ - \ as e:\n print(f\"Connection broken reconnecting the watcher\ - \ {str(e)}\")\n time.sleep(5) # Backoff before retrying\n \ - \ finally:\n w.stop()\n\n" - image: quay.io/modh/odh-generic-data-science-notebook@sha256:0efbb3ad6f8f342360cf1f002d40716a39d4c58f69163e053d5bd19b4fe732d4 + Unsupported value of {phase_num=}\")\n\n resources_per_worker = {\"nvidia.com/gpu\"\ + : nproc_per_node}\n\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\ + \n command = [\"/bin/sh\", \"-c\", \"--\"]\n\n master_args = [\n \ + \ f\"\"\"\n echo \"Running phase {phase_num}\"\n echo\ + \ \"Using {path_to_model} model for training\"\n echo \"Using {path_to_data}\ + \ data for training\"\n mkdir -p /output/phase_{phase_num}/model;\n\ + \ mkdir -p /output/data;\n torchrun --nnodes {nnodes} \\\n\ + \ --nproc_per_node {nproc_per_node} \\\n --node_rank\ + \ \\$(RANK) \\\n --rdzv_endpoint \\$(MASTER_ADDR):\\$(MASTER_PORT)\ + \ \\\n -m instructlab.training.main_ds \\\n --model_name_or_path={path_to_model}\ + \ \\\n --data_path={path_to_data} \\\n --output_dir=/output/phase_{phase_num}/model\ + \ \\\n --num_epochs={num_epochs} \\\n --effective_batch_size={effective_batch_size}\ + \ \\\n --learning_rate={learning_rate} \\\n --num_warmup_steps={num_warmup_steps}\ + \ \\\n --save_samples={save_samples} \\\n --log_level=INFO\ + \ \\\n --max_batch_len={max_batch_len} \\\n --seed={seed}\ + \ \\\n --cpu_offload_optimizer \\\n --cpu_offload_params_fsdp\ + \ \\\n --distributed_training_framework fsdp \\\n \ + \ --checkpoint_at_epoch\n \"\"\"\n ]\n\n worker_args =\ + \ [\n f\"\"\"\n echo \"Running phase {phase_num}\"\n \ + \ echo \"Using {path_to_model} model for training\"\n echo \"Using\ + \ {path_to_data} data for training\"\n mkdir -p /tmp/model;\n \ + \ torchrun --nnodes {nnodes} \\\n --nproc_per_node {nproc_per_node}\ + \ \\\n --node_rank \\$(RANK) \\\n --rdzv_endpoint \\$(MASTER_ADDR):\\\ + $(MASTER_PORT) \\\n -m instructlab.training.main_ds \\\n \ + \ --model_name_or_path={path_to_model} \\\n --data_path={path_to_data}\ + \ \\\n --output_dir=/tmp/model \\\n --num_epochs={num_epochs}\ + \ \\\n --effective_batch_size={effective_batch_size} \\\n \ + \ --learning_rate={learning_rate} \\\n --num_warmup_steps={num_warmup_steps}\ + \ \\\n --save_samples={save_samples} \\\n --log_level=INFO\ + \ \\\n --max_batch_len={max_batch_len} \\\n --seed={seed}\ + \ \\\n --cpu_offload_optimizer \\\n --cpu_offload_params_fsdp\ + \ \\\n --distributed_training_framework fsdp \\\n --checkpoint_at_epoch\n\ + \ \"\"\"\n ]\n\n # Set volumes\n volumes = [\n \ + \ models.V1Volume(\n name=\"input-data\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ + \ claim_name=input_pvc_name\n ),\n ),\n\ + \ models.V1Volume(\n name=\"model\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ + \ claim_name=model_pvc_name\n ),\n ),\n\ + \ models.V1Volume(\n name=\"output\",\n persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource(\n\ + \ claim_name=output_pvc_name\n ),\n ),\n\ + \ ]\n\n # Set volume mounts\n volume_mounts_master = [\n \ + \ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\ + input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\ + /input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\ + /output\", name=\"output\"),\n ]\n\n volume_mounts_worker = [\n \ + \ models.V1VolumeMount(\n mount_path=\"/input_data\", name=\"\ + input-data\", read_only=True\n ),\n models.V1VolumeMount(mount_path=\"\ + /input_model\", name=\"model\", read_only=True),\n models.V1VolumeMount(mount_path=\"\ + /output\", name=\"output\", read_only=True),\n ]\n\n # Set env variables\n\ + \ env_vars = [\n models.V1EnvVar(name=\"NNODES\", value=f\"{nnodes}\"\ + ),\n models.V1EnvVar(name=\"NPROC_PER_NODE\", value=f\"{nproc_per_node}\"\ + ),\n models.V1EnvVar(name=\"XDG_CACHE_HOME\", value=\"/tmp\"),\n\ + \ models.V1EnvVar(name=\"TRITON_CACHE_DIR\", value=\"/tmp\"),\n \ + \ models.V1EnvVar(name=\"HF_HOME\", value=\"/tmp\"),\n models.V1EnvVar(name=\"\ + TRANSFORMERS_CACHE\", value=\"/tmp\"),\n ]\n\n # Get master and worker\ + \ container specs\n master_container_spec = kfto_utils.get_container_spec(\n\ + \ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\ + \ volume_mounts=volume_mounts_master,\n )\n\n # In the next\ + \ release of kubeflow-training, the command\n # and the args will be\ + \ a part of kfto_utils.get_container_spec function\n master_container_spec.command\ + \ = command\n master_container_spec.args = master_args\n\n master_container_spec.env\ + \ = env_vars\n\n worker_container_spec = kfto_utils.get_container_spec(\n\ + \ base_image=base_image,\n name=\"pytorch\",\n resources=resources_per_worker,\n\ + \ volume_mounts=volume_mounts_worker,\n )\n worker_container_spec.command\ + \ = command\n worker_container_spec.args = worker_args\n worker_container_spec.env\ + \ = env_vars\n\n # create master pod spec\n master_pod_template_spec\ + \ = kfto_utils.get_pod_template_spec(\n containers=[master_container_spec],\n\ + \ volumes=volumes,\n )\n\n # create worker pod spec\n worker_pod_template_spec\ + \ = kfto_utils.get_pod_template_spec(\n containers=[worker_container_spec],\n\ + \ volumes=volumes,\n )\n\n logging.getLogger(__name__).setLevel(logging.INFO)\n\ + \ logging.info(\"Generating job template.\")\n logging.info(\"Creating\ + \ TrainingClient.\")\n\n # Initialize training client\n # This also\ + \ finds the namespace from /var/run/secrets/kubernetes.io/serviceaccount/namespace\n\ + \ # And it also loads the kube config\n training_client = TrainingClient()\n\ + \ namespace = training_client.namespace\n\n # Create pytorch job spec\n\ + \ job_template = kfto_utils.get_pytorchjob_template(\n name=name,\n\ + \ namespace=namespace,\n worker_pod_template_spec=worker_pod_template_spec,\n\ + \ master_pod_template_spec=master_pod_template_spec,\n num_workers=nnodes,\n\ + \ num_procs_per_worker=nproc_per_node,\n )\n # Save the pytorch\ + \ job yaml for record keeping and debugging\n with open(pytorchjob_output_yaml.path,\ + \ \"w\", encoding=\"utf-8\") as f:\n f.write(job_template.to_str())\n\ + \n # Run the pytorch job\n logging.info(f\"Creating PyTorchJob in\ + \ namespace: {namespace}\")\n training_client.create_job(job_template,\ + \ namespace=namespace)\n\n expected_conditions = [\"Succeeded\", \"Failed\"\ + ]\n logging.info(f\"Monitoring job until status is any of {expected_conditions}.\"\ + )\n\n def get_logs(job):\n _, _ = training_client.get_job_logs(name=job.metadata.name,\ + \ follow=True)\n\n training_client.wait_for_job_conditions(\n \ + \ name=name,\n expected_conditions=set(expected_conditions),\n \ + \ wait_timeout=job_timeout,\n timeout=job_timeout,\n \ + \ callback=get_logs,\n )\n\n if delete_after_done:\n logging.info(\"\ + Deleting job after completion.\")\n training_client.delete_job(name,\ + \ namespace)\n\n" + image: quay.io/modh/odh-generic-data-science-notebook@sha256:72c1d095adbda216a1f1b4b6935e3e2c717cbc58964009464ccd36c0b98312b2 exec-run-final-eval-op: container: args: @@ -1853,10 +1737,10 @@ root: constant: /output/mt_bench_data.json taskInfo: name: pvc-to-mt-bench-op - pytorchjob-manifest-op: + pytorch-job-launcher-op: cachingOptions: {} componentRef: - name: comp-pytorchjob-manifest-op + name: comp-pytorch-job-launcher-op dependentTasks: - createpvc - createpvc-2 @@ -1865,6 +1749,9 @@ root: - model-to-pvc-op inputs: parameters: + base_image: + runtimeValue: + constant: registry.redhat.io/rhelai1/instructlab-nvidia-rhel9@sha256:05cfba1fb13ed54b1de4d021da2a31dd78ba7d8cc48e10c7fe372815899a18ae effective_batch_size: componentInputParameter: train_effective_batch_size_phase_1 input_pvc_name: @@ -1903,18 +1790,21 @@ root: seed: componentInputParameter: train_seed taskInfo: - name: pytorchjob-manifest-op - pytorchjob-manifest-op-2: + name: pytorch-job-launcher-op + pytorch-job-launcher-op-2: cachingOptions: {} componentRef: - name: comp-pytorchjob-manifest-op-2 + name: comp-pytorch-job-launcher-op-2 dependentTasks: - createpvc - createpvc-2 - createpvc-3 - - pytorchjob-manifest-op + - pytorch-job-launcher-op inputs: parameters: + base_image: + runtimeValue: + constant: registry.redhat.io/rhelai1/instructlab-nvidia-rhel9@sha256:05cfba1fb13ed54b1de4d021da2a31dd78ba7d8cc48e10c7fe372815899a18ae effective_batch_size: componentInputParameter: train_effective_batch_size_phase_2 input_pvc_name: @@ -1953,7 +1843,7 @@ root: seed: componentInputParameter: train_seed taskInfo: - name: pytorchjob-manifest-op-2 + name: pytorch-job-launcher-op-2 run-final-eval-op: cachingOptions: {} componentRef: @@ -1991,7 +1881,7 @@ root: name: comp-run-mt-bench-op dependentTasks: - createpvc-3 - - pytorchjob-manifest-op-2 + - pytorch-job-launcher-op-2 inputs: parameters: max_workers: @@ -2273,7 +2163,7 @@ platforms: taskOutputParameter: outputParameterKey: name producerTask: createpvc-3 - exec-pytorchjob-manifest-op-2: + exec-pytorch-job-launcher-op-2: pvcMount: - mountPath: /output taskOutputParameter: diff --git a/training/__init__.py b/training/__init__.py index 22770e46..470c3a76 100644 --- a/training/__init__.py +++ b/training/__init__.py @@ -1,13 +1,13 @@ from .components import ( data_processing_op, knowledge_processed_data_to_artifact_op, - pytorchjob_manifest_op, + pytorch_job_launcher_op, skills_processed_data_to_artifact_op, ) __all__ = [ "data_processing_op", - "pytorchjob_manifest_op", + "pytorch_job_launcher_op", "skills_processed_data_to_artifact_op", "knowledge_processed_data_to_artifact_op", ] diff --git a/training/components.py b/training/components.py index 917f2a24..852b7412 100644 --- a/training/components.py +++ b/training/components.py @@ -1,5 +1,5 @@ # type: ignore -# pylint: disable=import-outside-toplevel,import-error +# pylint: disable=import-outside-toplevel,missing-function-docstring from typing import Optional @@ -8,7 +8,10 @@ from utils.consts import PYTHON_IMAGE, RHELAI_IMAGE, TOOLBOX_IMAGE -@dsl.component(base_image=RHELAI_IMAGE, install_kfp_package=False) +@dsl.component( + base_image=RHELAI_IMAGE, + install_kfp_package=False, +) def data_processing_op( model_path: str = "/model", sdg_path: str = "/data/sdg", @@ -120,14 +123,16 @@ def knowledge_processed_data_to_artifact_op( ) +# Change base image to the RHOAI python image with kubeflow_training once available @dsl.component(base_image=PYTHON_IMAGE, install_kfp_package=False) -def pytorchjob_manifest_op( +def pytorch_job_launcher_op( + pytorchjob_output_yaml: dsl.Output[dsl.Artifact], model_pvc_name: str, input_pvc_name: str, output_pvc_name: str, name_suffix: str, - # path_to_model: str, phase_num: int, + base_image: str, nproc_per_node: int = 3, nnodes: int = 2, num_epochs: int = 2, @@ -137,27 +142,25 @@ def pytorchjob_manifest_op( save_samples: int = 0, max_batch_len: int = 20000, seed: int = 42, + job_timeout: int = 86400, + delete_after_done: bool = False, ): - import inspect + import logging import os - import time - import kubernetes - import urllib3 - import yaml + from kubeflow.training import TrainingClient, models + from kubeflow.training.utils import utils as kfto_utils def list_phase1_final_model(): model_dir = "/output/phase_1/model/hf_format" - models = os.listdir(model_dir) + model_list = os.listdir(model_dir) newest_idx = max( (os.path.getmtime(f"{model_dir}/{model}"), i) - for i, model in enumerate(models) + for i, model in enumerate(model_list) )[-1] - newest_model = models[newest_idx] + newest_model = model_list[newest_idx] return f"{model_dir}/{newest_model}" - name = f"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}" - if phase_num == 1: path_to_model = "/input_model" path_to_data = "/input_data/knowledge/data.jsonl" @@ -167,279 +170,197 @@ def list_phase1_final_model(): else: raise RuntimeError(f"Unsupported value of {phase_num=}") - image = "registry.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1" + resources_per_worker = {"nvidia.com/gpu": nproc_per_node} + + name = f"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}" + command = ["/bin/sh", "-c", "--"] + + master_args = [ + f""" + echo "Running phase {phase_num}" + echo "Using {path_to_model} model for training" + echo "Using {path_to_data} data for training" + mkdir -p /output/phase_{phase_num}/model; + mkdir -p /output/data; + torchrun --nnodes {nnodes} \ + --nproc_per_node {nproc_per_node} \ + --node_rank \$(RANK) \ + --rdzv_endpoint \$(MASTER_ADDR):\$(MASTER_PORT) \ + -m instructlab.training.main_ds \ + --model_name_or_path={path_to_model} \ + --data_path={path_to_data} \ + --output_dir=/output/phase_{phase_num}/model \ + --num_epochs={num_epochs} \ + --effective_batch_size={effective_batch_size} \ + --learning_rate={learning_rate} \ + --num_warmup_steps={num_warmup_steps} \ + --save_samples={save_samples} \ + --log_level=INFO \ + --max_batch_len={max_batch_len} \ + --seed={seed} \ + --cpu_offload_optimizer \ + --cpu_offload_params_fsdp \ + --distributed_training_framework fsdp \ + --checkpoint_at_epoch + """ + ] - manifest = inspect.cleandoc( + worker_args = [ f""" - apiVersion: kubeflow.org/v1 - kind: PyTorchJob - metadata: - name: {name} - spec: - nprocPerNode: \"{nproc_per_node}\" - pytorchReplicaSpecs: - Master: - replicas: 1 - restartPolicy: OnFailure - template: - metadata: - annotations: - sidecar.istio.io/inject: 'false' - spec: - containers: - - args: - - | - echo "Running phase {phase_num}" - echo "Using {path_to_model} model for training" - echo "Using {path_to_data} data for training" - mkdir -p /output/phase_{phase_num}/model; - mkdir -p /output/data; - torchrun --nnodes {nnodes} \ - --nproc_per_node {nproc_per_node} \ - --node_rank \$(RANK) \ - --rdzv_endpoint \$(MASTER_ADDR):\$(MASTER_PORT) \ - -m instructlab.training.main_ds \ - --model_name_or_path={path_to_model} \ - --data_path={path_to_data} \ - --output_dir=/output/phase_{phase_num}/model \ - --num_epochs={num_epochs} \ - --effective_batch_size={effective_batch_size} \ - --learning_rate={learning_rate} \ - --num_warmup_steps={num_warmup_steps} \ - --save_samples={save_samples} \ - --log_level=INFO \ - --max_batch_len={max_batch_len} \ - --seed={seed} \ - --cpu_offload_optimizer \ - --cpu_offload_params_fsdp \ - --distributed_training_framework fsdp \ - --checkpoint_at_epoch - command: - - /bin/bash - - '-c' - - '--' - image: {image} - name: pytorch - volumeMounts: - - mountPath: /input_data - name: input-data - readOnly: true - - mountPath: /input_model - name: model - readOnly: true - - mountPath: /output - name: output - env: - - name: NNODES - value: \"{nnodes}\" - - name: NPROC_PER_NODE - value: \"{nproc_per_node}\" - - name: XDG_CACHE_HOME - value: /tmp - - name: TRITON_CACHE_DIR - value: /tmp - - name: HF_HOME - value: /tmp - - name: TRANSFORMERS_CACHE - value: /tmp - resources: - requests: - "nvidia.com/gpu": {nproc_per_node} - limits: - "nvidia.com/gpu": {nproc_per_node} - volumes: - - name: input-data - persistentVolumeClaim: - claimName: {input_pvc_name} - - name: model - persistentVolumeClaim: - claimName: {model_pvc_name} - - name: output - persistentVolumeClaim: - claimName: {output_pvc_name} - Worker: - replicas: {nnodes-1} - restartPolicy: OnFailure - template: - metadata: - annotations: - sidecar.istio.io/inject: 'false' - spec: - containers: - - args: - - | - echo "Running phase {phase_num}" - echo "Using {path_to_model} model for training" - echo "Using {path_to_data} data for training" - mkdir -p /tmp/model; - torchrun --nnodes {nnodes} \ - --nproc_per_node {nproc_per_node} \ - --node_rank \$(RANK) \ - --rdzv_endpoint \$(MASTER_ADDR):\$(MASTER_PORT) \ - -m instructlab.training.main_ds \ - --model_name_or_path={path_to_model} \ - --data_path={path_to_data} \ - --output_dir=/tmp/model \ - --num_epochs={num_epochs} \ - --effective_batch_size={effective_batch_size} \ - --learning_rate={learning_rate} \ - --num_warmup_steps={num_warmup_steps} \ - --save_samples={save_samples} \ - --log_level=INFO \ - --max_batch_len={max_batch_len} \ - --seed={seed} \ - --cpu_offload_optimizer \ - --cpu_offload_params_fsdp \ - --distributed_training_framework fsdp \ - --checkpoint_at_epoch - command: - - /bin/bash - - '-c' - - '--' - image: {image} - name: pytorch - volumeMounts: - - mountPath: /input_data - name: input-data - readOnly: true - - mountPath: /input_model - name: model - readOnly: true - - mountPath: /output - name: output - readOnly: true - env: - - name: NNODES - value: \"{nnodes}\" - - name: NPROC_PER_NODE - value: \"{nproc_per_node}\" - - name: XDG_CACHE_HOME - value: /tmp - - name: TRITON_CACHE_DIR - value: /tmp - - name: HF_HOME - value: /tmp - - name: TRANSFORMERS_CACHE - value: /tmp - resources: - requests: - "nvidia.com/gpu": {nproc_per_node} - limits: - "nvidia.com/gpu": {nproc_per_node} - volumes: - - name: input-data - persistentVolumeClaim: - claimName: {input_pvc_name} - - name: model - persistentVolumeClaim: - claimName: {model_pvc_name} - - name: output - persistentVolumeClaim: - claimName: {output_pvc_name} - """ + echo "Running phase {phase_num}" + echo "Using {path_to_model} model for training" + echo "Using {path_to_data} data for training" + mkdir -p /tmp/model; + torchrun --nnodes {nnodes} \ + --nproc_per_node {nproc_per_node} \ + --node_rank \$(RANK) \ + --rdzv_endpoint \$(MASTER_ADDR):\$(MASTER_PORT) \ + -m instructlab.training.main_ds \ + --model_name_or_path={path_to_model} \ + --data_path={path_to_data} \ + --output_dir=/tmp/model \ + --num_epochs={num_epochs} \ + --effective_batch_size={effective_batch_size} \ + --learning_rate={learning_rate} \ + --num_warmup_steps={num_warmup_steps} \ + --save_samples={save_samples} \ + --log_level=INFO \ + --max_batch_len={max_batch_len} \ + --seed={seed} \ + --cpu_offload_optimizer \ + --cpu_offload_params_fsdp \ + --distributed_training_framework fsdp \ + --checkpoint_at_epoch + """ + ] + + # Set volumes + volumes = [ + models.V1Volume( + name="input-data", + persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource( + claim_name=input_pvc_name + ), + ), + models.V1Volume( + name="model", + persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource( + claim_name=model_pvc_name + ), + ), + models.V1Volume( + name="output", + persistent_volume_claim=models.V1PersistentVolumeClaimVolumeSource( + claim_name=output_pvc_name + ), + ), + ] + + # Set volume mounts + volume_mounts_master = [ + models.V1VolumeMount( + mount_path="/input_data", name="input-data", read_only=True + ), + models.V1VolumeMount(mount_path="/input_model", name="model", read_only=True), + models.V1VolumeMount(mount_path="/output", name="output"), + ] + + volume_mounts_worker = [ + models.V1VolumeMount( + mount_path="/input_data", name="input-data", read_only=True + ), + models.V1VolumeMount(mount_path="/input_model", name="model", read_only=True), + models.V1VolumeMount(mount_path="/output", name="output", read_only=True), + ] + + # Set env variables + env_vars = [ + models.V1EnvVar(name="NNODES", value=f"{nnodes}"), + models.V1EnvVar(name="NPROC_PER_NODE", value=f"{nproc_per_node}"), + models.V1EnvVar(name="XDG_CACHE_HOME", value="/tmp"), + models.V1EnvVar(name="TRITON_CACHE_DIR", value="/tmp"), + models.V1EnvVar(name="HF_HOME", value="/tmp"), + models.V1EnvVar(name="TRANSFORMERS_CACHE", value="/tmp"), + ] + + # Get master and worker container specs + master_container_spec = kfto_utils.get_container_spec( + base_image=base_image, + name="pytorch", + resources=resources_per_worker, + volume_mounts=volume_mounts_master, ) - try: - manifest_yaml = yaml.safe_load(manifest) - except yaml.YAMLError as exc: - raise RuntimeError(f"Error parsing manifest: {exc}") from exc - - # Discover the namespace in which the pod is running - with open( - "/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r", encoding="utf-8" - ) as f: - namespace = f.read().strip() - print(f"The pod is running in the namespace: {namespace}") - - try: - kubernetes.config.load_kube_config() - print("Loaded kube config") - except kubernetes.config.ConfigException: - print("Failed to load kube config. Trying in-cluster config") - kubernetes.config.load_incluster_config() - - api = kubernetes.client.CustomObjectsApi() - try: - api.create_namespaced_custom_object( - group="kubeflow.org", - version="v1", - namespace=namespace, - plural="pytorchjobs", - body=manifest_yaml, - ) - except kubernetes.client.rest.ApiException as exc: - if exc.status == 409: - print( - "{} '{}/{}' already exists.".format( - manifest_yaml["kind"], - namespace, - manifest_yaml["metadata"]["name"], - ) - ) - else: - raise - - # Get the CR status and wait for it to be completed - w = kubernetes.watch.Watch() - exit_flag = False - start_time = time.time() - timeout_seconds = 24 * 60 * 60 # 24 hours - - while not exit_flag: # Keep the watch active - if time.time() - start_time > timeout_seconds: - raise RuntimeError( - "Timeout (24h) reached waiting for the PytorchJob to complete." - ) + # In the next release of kubeflow-training, the command + # and the args will be a part of kfto_utils.get_container_spec function + master_container_spec.command = command + master_container_spec.args = master_args + + master_container_spec.env = env_vars + + worker_container_spec = kfto_utils.get_container_spec( + base_image=base_image, + name="pytorch", + resources=resources_per_worker, + volume_mounts=volume_mounts_worker, + ) + worker_container_spec.command = command + worker_container_spec.args = worker_args + worker_container_spec.env = env_vars + + # create master pod spec + master_pod_template_spec = kfto_utils.get_pod_template_spec( + containers=[master_container_spec], + volumes=volumes, + ) + + # create worker pod spec + worker_pod_template_spec = kfto_utils.get_pod_template_spec( + containers=[worker_container_spec], + volumes=volumes, + ) + + logging.getLogger(__name__).setLevel(logging.INFO) + logging.info("Generating job template.") + logging.info("Creating TrainingClient.") + + # Initialize training client + # This also finds the namespace from /var/run/secrets/kubernetes.io/serviceaccount/namespace + # And it also loads the kube config + training_client = TrainingClient() + namespace = training_client.namespace + + # Create pytorch job spec + job_template = kfto_utils.get_pytorchjob_template( + name=name, + namespace=namespace, + worker_pod_template_spec=worker_pod_template_spec, + master_pod_template_spec=master_pod_template_spec, + num_workers=nnodes, + num_procs_per_worker=nproc_per_node, + ) + # Save the pytorch job yaml for record keeping and debugging + with open(pytorchjob_output_yaml.path, "w", encoding="utf-8") as f: + f.write(job_template.to_str()) + + # Run the pytorch job + logging.info(f"Creating PyTorchJob in namespace: {namespace}") + training_client.create_job(job_template, namespace=namespace) + + expected_conditions = ["Succeeded", "Failed"] + logging.info(f"Monitoring job until status is any of {expected_conditions}.") + + def get_logs(job): + _, _ = training_client.get_job_logs(name=job.metadata.name, follow=True) + + training_client.wait_for_job_conditions( + name=name, + expected_conditions=set(expected_conditions), + wait_timeout=job_timeout, + timeout=job_timeout, + callback=get_logs, + ) - try: - print("Watching for PytorchJob") - for event in w.stream( - api.list_namespaced_custom_object, - group="kubeflow.org", - version="v1", - namespace=namespace, - plural="pytorchjobs", - timeout_seconds=60, # Timeout after 1 minute - ): - pytorchjob_event = event["object"] - if ( - pytorchjob_event["metadata"]["name"] - != manifest_yaml["metadata"]["name"] - ): - continue - pytorchjob_name = pytorchjob_event["metadata"]["name"] - - if ( - "status" not in pytorchjob_event - or "conditions" not in pytorchjob_event["status"] - ): - continue - print( - f"PytorchJob: {pytorchjob_name} - {pytorchjob_event['status'].get('conditions', 'No conditions yet')}" - ) - for job_condition in reversed(pytorchjob_event["status"]["conditions"]): - if job_condition["type"] == "Succeeded": - print( - f"PytorchJob '{pytorchjob_name}' completed successfully: {job_condition['reason']}" - ) - print(f"Training phase {phase_num} completed.") - w.stop() - exit_flag = True - # Break here to avoid going into other conditions, we are done - break - elif job_condition["type"] == "Failed": - print( - f"PytorchJob '{pytorchjob_name}' failed: {job_condition['reason']}" - ) - w.stop() - raise RuntimeError("Job failed.") - except kubernetes.client.exceptions.ApiException as e: - print(f"API exception occurred: {str(e)}") - time.sleep(5) # Backoff before retrying - # Catches the following error: - # urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength - except urllib3.exceptions.ProtocolError as e: - print(f"Connection broken reconnecting the watcher {str(e)}") - time.sleep(5) # Backoff before retrying - finally: - w.stop() + if delete_after_done: + logging.info("Deleting job after completion.") + training_client.delete_job(name, namespace) diff --git a/utils/consts.py b/utils/consts.py index 0087f2d4..04f271c7 100644 --- a/utils/consts.py +++ b/utils/consts.py @@ -1,4 +1,4 @@ -PYTHON_IMAGE = "quay.io/modh/odh-generic-data-science-notebook@sha256:0efbb3ad6f8f342360cf1f002d40716a39d4c58f69163e053d5bd19b4fe732d4" # v3-2024b-20250115 +PYTHON_IMAGE = "quay.io/modh/odh-generic-data-science-notebook@sha256:72c1d095adbda216a1f1b4b6935e3e2c717cbc58964009464ccd36c0b98312b2" # v3-20250116 TOOLBOX_IMAGE = "registry.redhat.io/ubi9/toolbox@sha256:da31dee8904a535d12689346e65e5b00d11a6179abf1fa69b548dbd755fa2770" # v9.5 OC_IMAGE = "registry.redhat.io/openshift4/ose-cli@sha256:08bdbfae224dd39c81689ee73c183619d6b41eba7ac04f0dce7ee79f50531d0b" # v4.15.0 RHELAI_IMAGE = "registry.redhat.io/rhelai1/instructlab-nvidia-rhel9@sha256:05cfba1fb13ed54b1de4d021da2a31dd78ba7d8cc48e10c7fe372815899a18ae" # v1.3.2