Skip to content

Commit 3a2708c

Browse files
committed
REMOVE ME: PIPELINE NO TRAINING
Signed-off-by: sallyom <[email protected]>
1 parent a80d7ad commit 3a2708c

File tree

2 files changed

+315
-1130
lines changed

2 files changed

+315
-1130
lines changed

pipeline-no-train.py

+223
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
# type: ignore
2+
# pylint: disable=no-value-for-parameter,import-outside-toplevel,import-error,no-member
3+
from typing import List, Literal, Optional
4+
import click
5+
from kfp import dsl, compiler
6+
from kfp.kubernetes import (
7+
use_config_map_as_env,
8+
use_secret_as_env,
9+
CreatePVC,
10+
DeletePVC,
11+
mount_pvc,
12+
)
13+
14+
# For now, all external models are the same mistral, but won't be always
15+
K8S_NAME = "kfp-model-server"
16+
JUDGE_CONFIG_MAP = "kfp-model-server"
17+
JUDGE_SECRET = "judge-server"
18+
MOCKED_STAGES = ["sdg", "train", "eval"]
19+
20+
# Output PVC
21+
OUTPUT = "5be06b0c-237f-4797-8cc1-9cada3fae38e-output"
22+
23+
def pipeline_wrapper(mock: List[Literal[MOCKED_STAGES]]):
24+
"""Wrapper for KFP pipeline, which allows for mocking individual stages."""
25+
26+
# Imports for SDG stage
27+
if "sdg" in mock:
28+
from sdg.faked import git_clone_op, sdg_op
29+
else:
30+
from sdg import git_clone_op, sdg_op
31+
32+
# Imports for Training stage
33+
if "train" in mock:
34+
from training.faked import pytorchjob_manifest_op
35+
from utils.faked import (
36+
kubectl_apply_op,
37+
kubectl_wait_for_op,
38+
huggingface_importer_op,
39+
pvc_to_artifact_op,
40+
pvc_to_model_op
41+
)
42+
from utils import artifact_to_pvc_op
43+
else:
44+
from training import data_processing_op, pytorchjob_manifest_op
45+
from utils import (
46+
kubectl_apply_op,
47+
kubectl_wait_for_op,
48+
artifact_to_pvc_op,
49+
huggingface_importer_op,
50+
pvc_to_artifact_op,
51+
pvc_to_model_op
52+
)
53+
54+
# Imports for MMLU, MT_BENCH stage
55+
# TODO: Add mock/fake components
56+
from utils import list_models_in_directory_op
57+
from eval.mmlu import run_mmlu_op, load_mmlu_results_op
58+
from eval.mt_bench import run_mt_bench_op, load_mt_bench_results_op
59+
from eval.final import run_mmlu_branch_mt_bench_branch_op
60+
61+
@dsl.pipeline(
62+
display_name="InstructLab",
63+
name="instructlab",
64+
description="InstructLab pipeline",
65+
)
66+
def pipeline(
67+
num_instructions_to_generate: int = 2,
68+
repo_url: str = "https://github.com/instructlab/taxonomy.git",
69+
repo_branch: Optional[str] = None,
70+
repo_pr: Optional[int] = None,
71+
storage_class_name: str = "nfs-csi",
72+
base_model: str = "ibm-granite/granite-7b-base",
73+
# minimal subset of MMLU_TASKS
74+
mmlu_tasks_list: str = "mmlu_anatomy,mmlu_astronomy",
75+
model_dtype: str = "bfloat16",
76+
few_shots: int = 5,
77+
batch_size: int = 8,
78+
merge_system_user_message: bool = False,
79+
device: str = None,
80+
):
81+
82+
# SDG stage
83+
git_clone_task = git_clone_op(
84+
repo_branch=repo_branch, repo_pr=repo_pr if repo_pr and repo_pr > 0 else None, repo_url=repo_url
85+
)
86+
87+
sdg_task = sdg_op(
88+
num_instructions_to_generate=num_instructions_to_generate,
89+
taxonomy=git_clone_task.outputs["taxonomy"],
90+
repo_branch=repo_branch,
91+
repo_pr=repo_pr,
92+
)
93+
use_config_map_as_env(
94+
sdg_task, K8S_NAME, dict(endpoint="endpoint", model="model")
95+
)
96+
use_secret_as_env(sdg_task, K8S_NAME, {"api_key": "api_key"})
97+
98+
99+
# Training stage
100+
101+
# We need to pass storage_class_name as "" to use the default StorageClass, if left empty, KFP uses "standard" StorageClass.
102+
# 'standard' != default StorageClass
103+
# https://github.com/kubeflow/pipelines/blob/1cded35cf5e93d8c8d32fefbddceb2eed8de9a0a/backend/src/v2/driver/driver.go#L1428-L1436
104+
# At least we made it a pipeline parameter
105+
model_pvc_task = CreatePVC(
106+
pvc_name_suffix="-model-cache",
107+
access_modes=["ReadWriteMany"],
108+
size="100Gi",
109+
storage_class_name=storage_class_name,
110+
)
111+
model_to_artifact = huggingface_importer_op(repo_name=base_model)
112+
model_to_pvc_task = artifact_to_pvc_op(
113+
data=model_to_artifact.outputs["model"], pvc_path="/model"
114+
)
115+
model_to_pvc_task.set_caching_options(False)
116+
mount_pvc(
117+
task=model_to_pvc_task, pvc_name=model_pvc_task.output, mount_path="/model"
118+
)
119+
120+
#Data processing
121+
data_processing_task = data_processing_op(
122+
sdg = sdg_task.outputs["sdg"],
123+
model = model_to_artifact.outputs["model"]
124+
)
125+
126+
sdg_input_pvc_task = CreatePVC(
127+
pvc_name_suffix="-sdg",
128+
access_modes=["ReadWriteMany"],
129+
size="1Gi",
130+
storage_class_name=storage_class_name,
131+
)
132+
sdg_to_pvc_task = artifact_to_pvc_op(
133+
data=data_processing_task.outputs["processed_data"], pvc_path="/data"
134+
)
135+
sdg_to_pvc_task.set_caching_options(False)
136+
mount_pvc(
137+
task=sdg_to_pvc_task, pvc_name=sdg_input_pvc_task.output, mount_path="/data"
138+
)
139+
140+
###
141+
142+
###
143+
144+
final_eval_task = run_mmlu_branch_mt_bench_branch_op(
145+
base_model = model_to_artifact.outputs["model"],
146+
# hard-code from PVC
147+
candidate_model = "/output/model/hf_format/samples_588",
148+
taxonomy = git_clone_task.outputs["taxonomy"],
149+
tasks = sdg_task.outputs["sdg"],
150+
base_branch=repo_branch,
151+
candidate_branch=repo_branch,
152+
merge_system_user_message = merge_system_user_message,
153+
model_dtype=model_dtype,
154+
few_shots=few_shots,
155+
batch_size=batch_size,
156+
device = device,
157+
)
158+
159+
mount_pvc(
160+
task=final_eval_task, pvc_name=OUTPUT, mount_path="/output"
161+
)
162+
163+
use_config_map_as_env(
164+
final_eval_task, JUDGE_CONFIG_MAP, dict(endpoint="JUDGE_ENDPOINT", model="JUDGE_NAME")
165+
)
166+
167+
use_secret_as_env(final_eval_task, JUDGE_SECRET, {"api_key": "JUDGE_API_KEY"})
168+
169+
final_eval_task.set_accelerator_type('nvidia.com/gpu')
170+
final_eval_task.set_accelerator_limit(1)
171+
172+
# Technically `output_model_task` and `output_data_task` can happen before evaluation,
173+
# however the PVC can only be mounted once, so, setting these to _after_ so the eval proceeds.
174+
output_model_task = pvc_to_artifact_op(
175+
pvc_path="/output/data",
176+
)
177+
output_model_task.after(final_eval_task)
178+
output_model_task.set_caching_options(False)
179+
180+
mount_pvc(
181+
task=output_model_task, pvc_name=OUTPUT, mount_path="/output/data"
182+
)
183+
184+
output_data_task = pvc_to_model_op(
185+
pvc_path="/output/model",
186+
)
187+
output_data_task.after(final_eval_task)
188+
189+
mount_pvc(
190+
task=output_data_task, pvc_name=OUTPUT, mount_path="/output/model"
191+
)
192+
193+
sdg_pvc_delete_task = DeletePVC(pvc_name=sdg_input_pvc_task.output)
194+
sdg_pvc_delete_task.after(output_data_task)
195+
196+
model_pvc_delete_task = DeletePVC(pvc_name=model_pvc_task.output)
197+
model_pvc_delete_task.after(output_data_task)
198+
199+
return
200+
201+
202+
return pipeline
203+
204+
205+
@click.command()
206+
@click.option(
207+
"--mock",
208+
type=click.Choice(MOCKED_STAGES, case_sensitive=False),
209+
help="Mock part of the pipeline",
210+
multiple=True,
211+
default=[],
212+
)
213+
def cli(mock):
214+
215+
p = pipeline_wrapper(mock)
216+
217+
with click.progressbar(length=1, label="Generating pipeline") as bar:
218+
compiler.Compiler().compile(p, "pipeline.yaml")
219+
bar.update(1)
220+
221+
222+
if __name__ == "__main__":
223+
cli()

0 commit comments

Comments
 (0)