Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use knowledge data for phase 1 training and skills data for phase 2 training in standalone script #113

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,26 @@ def pipeline(
size="1Gi",
storage_class_name=storage_class_name,
)
sdg_to_pvc_task = artifact_to_pvc_op(
data=data_processing_task.outputs["processed_data"], pvc_path="/data"

sdg_skills_to_pvc_task = artifact_to_pvc_op(
data=data_processing_task.outputs["skills_processed_data"], pvc_path="/data"
)
sdg_to_pvc_task.set_caching_options(False)
sdg_skills_to_pvc_task.set_caching_options(False)
mount_pvc(
task=sdg_to_pvc_task, pvc_name=sdg_input_pvc_task.output, mount_path="/data"
task=sdg_skills_to_pvc_task,
pvc_name=sdg_input_pvc_task.output,
mount_path="/data",
)

sdg_knowledge_to_pvc_task = artifact_to_pvc_op(
data=data_processing_task.outputs["knowledge_processed_data"],
pvc_path="/data",
)
sdg_knowledge_to_pvc_task.set_caching_options(False)
mount_pvc(
task=sdg_knowledge_to_pvc_task,
pvc_name=sdg_input_pvc_task.output,
mount_path="/data",
)

output_pvc_task = CreatePVC(
Expand All @@ -177,7 +191,7 @@ def pipeline(
kubectl_apply_task = kubectl_apply_op(
manifest=pytorchjob_manifest_task.outputs["manifest"]
)
kubectl_apply_task.after(sdg_to_pvc_task, model_to_pvc_task)
kubectl_apply_task.after(sdg_knowledge_to_pvc_task, model_to_pvc_task)
kubectl_apply_task.set_caching_options(False)

kubectl_wait_task = kubectl_wait_for_op(
Expand Down Expand Up @@ -255,7 +269,7 @@ def pipeline(
kubectl_apply_2_task = kubectl_apply_op(
manifest=pytorchjob_manifest_2_task.outputs["manifest"]
)
kubectl_apply_2_task.after(sdg_to_pvc_task, model_to_pvc_task)
kubectl_apply_2_task.after(sdg_knowledge_to_pvc_task, model_to_pvc_task)
kubectl_apply_2_task.set_caching_options(False)

kubectl_wait_2_task = kubectl_wait_for_op(
Expand Down Expand Up @@ -445,7 +459,7 @@ def gen_standalone():

# The list of executor names to extract details from to generate the standalone script
executors = {
"exec-data-processing-op": 'data_processing_op(max_seq_len={MAX_SEQ_LEN}, max_batch_len={MAX_BATCH_LEN}, sdg="{DATA_PVC_SDG_PATH}", model="{DATA_PVC_MODEL_PATH}", processed_data="{PREPROCESSED_DATA_PATH}")',
"exec-data-processing-op": 'data_processing_op(max_seq_len={MAX_SEQ_LEN}, max_batch_len={MAX_BATCH_LEN}, sdg="{DATA_PVC_SDG_PATH}", model="{DATA_PVC_MODEL_PATH}", skills_processed_data="{PREPROCESSED_DATA_PATH_SKILLS}", knowledge_processed_data="{PREPROCESSED_DATA_PATH_KNOWLEDGE}")',
"exec-sdg-op": 'sdg_op(num_instructions_to_generate={num_instructions_to_generate}, repo_branch="{exec_git_clone_op_repo_branch}", repo_pr={exec_git_clone_op_repo_pr}, taxonomy="{TAXONOMY_DATA_PATH}", sdg="{SDG_GENERATED_DATA_PATH}")',
"exec-git-clone-op": {},
"exec-huggingface-importer-op": 'huggingface_importer_op(repo_name="{REPO_GRANITE_7B_IMAGE}", model="{DATA_PVC_MODEL_PATH}")',
Expand Down
90 changes: 74 additions & 16 deletions pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ components:
parameters:
pvc_path:
parameterType: STRING
comp-artifact-to-pvc-op-3:
executorLabel: exec-artifact-to-pvc-op-3
inputDefinitions:
artifacts:
data:
artifactType:
schemaTitle: system.Artifact
schemaVersion: 0.0.1
parameters:
pvc_path:
parameterType: STRING
comp-createpvc:
executorLabel: exec-createpvc
inputDefinitions:
Expand Down Expand Up @@ -245,7 +256,11 @@ components:
parameterType: NUMBER_INTEGER
outputDefinitions:
artifacts:
processed_data:
knowledge_processed_data:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
skills_processed_data:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
Expand Down Expand Up @@ -539,6 +554,14 @@ deploymentSpec:
- /bin/sh
- -c
image: registry.access.redhat.com/ubi9/toolbox
exec-artifact-to-pvc-op-3:
container:
args:
- cp -r {{$.inputs.artifacts['data'].path}} {{$.inputs.parameters['pvc_path']}}
command:
- /bin/sh
- -c
image: registry.access.redhat.com/ubi9/toolbox
exec-createpvc:
container:
image: argostub/createpvc
Expand Down Expand Up @@ -575,16 +598,26 @@ deploymentSpec:

'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef data_processing_op(\n sdg: dsl.Input[dsl.Dataset],\n processed_data:\
\ dsl.Output[dsl.Dataset],\n model: dsl.Input[dsl.Artifact],\n max_seq_len:\
\ Optional[int] = 4096,\n max_batch_len: Optional[int] = 20000,\n):\n\
\ import os\n\n import instructlab.training.data_process as dp\n \
\ from instructlab.training import (\n DataProcessArgs,\n \
\ TrainingArgs,\n )\n\n # define training-specific arguments\n \
\ training_args = TrainingArgs(\n # define data-specific arguments\n\
\ model_path=model.path,\n data_path=f\"{sdg.path}/*_train_msgs*.jsonl\"\
,\n data_output_dir=processed_data.path,\n # define model-trianing\
\ parameters\n max_seq_len=max_seq_len,\n max_batch_len=max_batch_len,\n\
\ *\n\ndef data_processing_op(\n sdg: dsl.Input[dsl.Dataset],\n skills_processed_data:\
\ dsl.Output[dsl.Dataset],\n knowledge_processed_data: dsl.Output[dsl.Dataset],\n\
\ model: dsl.Input[dsl.Artifact],\n max_seq_len: Optional[int] = 4096,\n\
\ max_batch_len: Optional[int] = 20000,\n):\n import os\n\n import\
\ instructlab.training.data_process as dp\n from instructlab.training\
\ import (\n DataProcessArgs,\n TrainingArgs,\n )\n\n \
\ # define training-specific arguments\n skill_training_args = TrainingArgs(\n\
\ # define data-specific arguments\n model_path=model.path,\n\
\ data_path=f\"{sdg.path}/skills_train_msgs*.jsonl\",\n data_output_dir=skills_processed_data.path,\n\
\ # define model-trianing parameters\n max_seq_len=max_seq_len,\n\
\ max_batch_len=max_batch_len,\n # XXX(shanand): We don't\
\ need the following arguments\n # for data processing. Added them\
\ for now to avoid\n # Pydantic validation errors for TrainingArgs\n\
\ ckpt_output_dir=\"data/saved_checkpoints\",\n num_epochs=2,\n\
\ effective_batch_size=3840,\n save_samples=0,\n learning_rate=2e-6,\n\
\ warmup_steps=800,\n is_padding_free=True,\n )\n\n \
\ knowledge_training_args = TrainingArgs(\n # define data-specific\
\ arguments\n model_path=model.path,\n data_path=f\"{sdg.path}/knowledge_train_msgs*.jsonl\"\
,\n data_output_dir=knowledge_processed_data.path,\n # define\
\ model-trianing parameters\n max_seq_len=max_seq_len,\n max_batch_len=max_batch_len,\n\
\ # XXX(shanand): We don't need the following arguments\n \
\ # for data processing. Added them for now to avoid\n # Pydantic\
\ validation errors for TrainingArgs\n ckpt_output_dir=\"data/saved_checkpoints\"\
Expand All @@ -607,8 +640,8 @@ deploymentSpec:
\ data_output_path=train_args.data_output_dir,\n \
\ model_path=train_args.model_path,\n data_path=train_args.data_path,\n\
\ max_seq_len=train_args.max_seq_len,\n chat_tmpl_path=train_args.chat_tmpl_path,\n\
\ )\n )\n\n data_processing(train_args=training_args)\n\
\n"
\ )\n )\n\n data_processing(train_args=skill_training_args)\n\
\ data_processing(train_args=knowledge_training_args)\n\n"
image: registry.access.redhat.com/ubi9/python-311:latest
exec-deletepvc:
container:
Expand Down Expand Up @@ -1450,14 +1483,33 @@ root:
artifacts:
data:
taskOutputArtifact:
outputArtifactKey: processed_data
outputArtifactKey: skills_processed_data
producerTask: data-processing-op
parameters:
pvc_path:
runtimeValue:
constant: /data
taskInfo:
name: artifact-to-pvc-op-2
artifact-to-pvc-op-3:
cachingOptions: {}
componentRef:
name: comp-artifact-to-pvc-op-3
dependentTasks:
- createpvc-2
- data-processing-op
inputs:
artifacts:
data:
taskOutputArtifact:
outputArtifactKey: knowledge_processed_data
producerTask: data-processing-op
parameters:
pvc_path:
runtimeValue:
constant: /data
taskInfo:
name: artifact-to-pvc-op-3
createpvc:
cachingOptions:
enableCache: true
Expand Down Expand Up @@ -1624,7 +1676,7 @@ root:
name: comp-kubectl-apply-op
dependentTasks:
- artifact-to-pvc-op
- artifact-to-pvc-op-2
- artifact-to-pvc-op-3
- pytorchjob-manifest-op
inputs:
parameters:
Expand All @@ -1640,7 +1692,7 @@ root:
name: comp-kubectl-apply-op-2
dependentTasks:
- artifact-to-pvc-op
- artifact-to-pvc-op-2
- artifact-to-pvc-op-3
- pytorchjob-manifest-op-2
inputs:
parameters:
Expand Down Expand Up @@ -1972,6 +2024,12 @@ platforms:
taskOutputParameter:
outputParameterKey: name
producerTask: createpvc-2
exec-artifact-to-pvc-op-3:
pvcMount:
- mountPath: /data
taskOutputParameter:
outputParameterKey: name
producerTask: createpvc-2
exec-list-models-in-directory-op:
pvcMount:
- mountPath: /output/model
Expand Down
Loading