Skip to content

Commit

Permalink
REMOVE ME: PIPELINE NO TRAINING
Browse files Browse the repository at this point in the history
Signed-off-by: sallyom <[email protected]>
  • Loading branch information
sallyom committed Oct 1, 2024
1 parent a80d7ad commit 8371844
Showing 1 changed file with 258 additions and 0 deletions.
258 changes: 258 additions & 0 deletions pipeline-no-train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
# type: ignore
# pylint: disable=no-value-for-parameter,import-outside-toplevel,import-error,no-member
from typing import List, Literal, Optional
import click
from kfp import dsl, compiler
from kfp.kubernetes import (
use_config_map_as_env,
use_secret_as_env,
CreatePVC,
DeletePVC,
mount_pvc,
)

# For now, all external models are the same mistral, but won't be always
K8S_NAME = "kfp-model-server"
JUDGE_CONFIG_MAP = "kfp-model-server"
JUDGE_SECRET = "judge-server"
MOCKED_STAGES = ["sdg", "train", "eval"]

# Output PVC
OUTPUT = "5be06b0c-237f-4797-8cc1-9cada3fae38e-output"

def pipeline_wrapper(mock: List[Literal[MOCKED_STAGES]]):
"""Wrapper for KFP pipeline, which allows for mocking individual stages."""

# Imports for SDG stage
if "sdg" in mock:
from sdg.faked import git_clone_op, sdg_op
else:
from sdg import git_clone_op, sdg_op

# Imports for Training stage
if "train" in mock:
from training.faked import pytorchjob_manifest_op
from utils.faked import (
kubectl_apply_op,
kubectl_wait_for_op,
huggingface_importer_op,
pvc_to_artifact_op,
pvc_to_model_op
)
from utils import artifact_to_pvc_op
else:
from training import data_processing_op, pytorchjob_manifest_op
from utils import (
kubectl_apply_op,
kubectl_wait_for_op,
artifact_to_pvc_op,
huggingface_importer_op,
pvc_to_artifact_op,
pvc_to_model_op
)

# Imports for MMLU, MT_BENCH stage
# TODO: Add mock/fake components
from utils import list_models_in_directory_op
from eval.mmlu import run_mmlu_op, load_mmlu_results_op
from eval.mt_bench import run_mt_bench_op, load_mt_bench_results_op
from eval.final import run_mmlu_branch_mt_bench_branch_op

@dsl.pipeline(
display_name="InstructLab",
name="instructlab",
description="InstructLab pipeline",
)
def pipeline(
num_instructions_to_generate: int = 2,
repo_url: str = "https://github.com/instructlab/taxonomy.git",
repo_branch: Optional[str] = None,
repo_pr: Optional[int] = None,
storage_class_name: str = "nfs-csi",
base_model: str = "ibm-granite/granite-7b-base",
# minimal subset of MMLU_TASKS
mmlu_tasks_list: str = "mmlu_anatomy,mmlu_astronomy",
model_dtype: str = "bfloat16",
few_shots: int = 5,
batch_size: int = 8,
merge_system_user_message: bool = False,
device: str = None,
):

# SDG stage
git_clone_task = git_clone_op(
repo_branch=repo_branch, repo_pr=repo_pr if repo_pr and repo_pr > 0 else None, repo_url=repo_url
)

sdg_task = sdg_op(
num_instructions_to_generate=num_instructions_to_generate,
taxonomy=git_clone_task.outputs["taxonomy"],
repo_branch=repo_branch,
repo_pr=repo_pr,
)
use_config_map_as_env(
sdg_task, K8S_NAME, dict(endpoint="endpoint", model="model")
)
use_secret_as_env(sdg_task, K8S_NAME, {"api_key": "api_key"})


# Training stage

# We need to pass storage_class_name as "" to use the default StorageClass, if left empty, KFP uses "standard" StorageClass.
# 'standard' != default StorageClass
# https://github.com/kubeflow/pipelines/blob/1cded35cf5e93d8c8d32fefbddceb2eed8de9a0a/backend/src/v2/driver/driver.go#L1428-L1436
# At least we made it a pipeline parameter
model_pvc_task = CreatePVC(
pvc_name_suffix="-model-cache",
access_modes=["ReadWriteMany"],
size="100Gi",
storage_class_name=storage_class_name,
)
model_to_artifact = huggingface_importer_op(repo_name=base_model)
model_to_pvc_task = artifact_to_pvc_op(
data=model_to_artifact.outputs["model"], pvc_path="/model"
)
model_to_pvc_task.set_caching_options(False)
mount_pvc(
task=model_to_pvc_task, pvc_name=model_pvc_task.output, mount_path="/model"
)

#Data processing
data_processing_task = data_processing_op(
sdg = sdg_task.outputs["sdg"],
model = model_to_artifact.outputs["model"]
)

sdg_input_pvc_task = CreatePVC(
pvc_name_suffix="-sdg",
access_modes=["ReadWriteMany"],
size="1Gi",
storage_class_name=storage_class_name,
)
sdg_to_pvc_task = artifact_to_pvc_op(
data=data_processing_task.outputs["processed_data"], pvc_path="/data"
)
sdg_to_pvc_task.set_caching_options(False)
mount_pvc(
task=sdg_to_pvc_task, pvc_name=sdg_input_pvc_task.output, mount_path="/data"
)

###

models_list_2_task = list_models_in_directory_op(
models_folder="/output/model/model/hf_format",
)
models_list_2_task.set_caching_options(False)

models_list_2_task.after(sdg_to_pvc_task)

mount_pvc(
task=models_list_2_task, pvc_name=OUTPUT, mount_path="/output/model"
)

###

# MT_Bench Evaluation of models

run_mt_bench_task = run_mt_bench_op(
models_list=models_list_2_task.output,
models_path_prefix = "/output/model/hf_format",
merge_system_user_message = merge_system_user_message,
device = device,
)

mount_pvc(
task=run_mt_bench_task, pvc_name=OUTPUT, mount_path="/output"
)

run_mt_bench_task.after(models_list_2_task)

run_mt_bench_task.set_accelerator_type('nvidia.com/gpu')
run_mt_bench_task.set_accelerator_limit(1)
run_mt_bench_task.set_caching_options(False)

use_config_map_as_env(
run_mt_bench_task, JUDGE_CONFIG_MAP, dict(endpoint="JUDGE_ENDPOINT", model="JUDGE_NAME")
)

use_secret_as_env(run_mt_bench_task, JUDGE_SECRET, {"api_key": "JUDGE_API_KEY"})

final_eval_task = run_mmlu_branch_mt_bench_branch_op(
base_model = model_to_artifact.outputs["model"],
candidate_model = run_mt_bench_task.outputs["best_model"],
taxonomy = git_clone_task.outputs["taxonomy"],
tasks = sdg_task.outputs["sdg"],
base_branch=repo_branch,
candidate_branch=repo_branch,
merge_system_user_message = merge_system_user_message,
model_dtype=model_dtype,
few_shots=few_shots,
batch_size=batch_size,
device = device,
)

mount_pvc(
task=final_eval_task, pvc_name=OUTPUT, mount_path="/output"
)

use_config_map_as_env(
final_eval_task, JUDGE_CONFIG_MAP, dict(endpoint="JUDGE_ENDPOINT", model="JUDGE_NAME")
)

use_secret_as_env(final_eval_task, JUDGE_SECRET, {"api_key": "JUDGE_API_KEY"})

final_eval_task.set_accelerator_type('nvidia.com/gpu')
final_eval_task.set_accelerator_limit(1)

# Technically `output_model_task` and `output_data_task` can happen before evaluation,
# however the PVC can only be mounted once, so, setting these to _after_ so the eval proceeds.
output_model_task = pvc_to_artifact_op(
pvc_path="/output/data",
)
output_model_task.after(final_eval_task)
output_model_task.set_caching_options(False)

mount_pvc(
task=output_model_task, pvc_name=OUTPUT, mount_path="/output/data"
)

output_data_task = pvc_to_model_op(
pvc_path="/output/model",
)
output_data_task.after(run_mt_bench_task)

mount_pvc(
task=output_data_task, pvc_name=OUTPUT, mount_path="/output/model"
)

sdg_pvc_delete_task = DeletePVC(pvc_name=sdg_input_pvc_task.output)
sdg_pvc_delete_task.after(output_data_task)

model_pvc_delete_task = DeletePVC(pvc_name=model_pvc_task.output)
model_pvc_delete_task.after(output_data_task)

return


return pipeline


@click.command()
@click.option(
"--mock",
type=click.Choice(MOCKED_STAGES, case_sensitive=False),
help="Mock part of the pipeline",
multiple=True,
default=[],
)
def cli(mock):

p = pipeline_wrapper(mock)

with click.progressbar(length=1, label="Generating pipeline") as bar:
compiler.Compiler().compile(p, "pipeline.yaml")
bar.update(1)


if __name__ == "__main__":
cli()

0 comments on commit 8371844

Please sign in to comment.