From 118acd51345e94032280cb8f66e6caad30a63b3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Han?= Date: Tue, 8 Oct 2024 12:08:30 +0200 Subject: [PATCH] Revamp Standalone MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit does multiple things: * Move all standalone related work into the 'standalone' directory * Add documentation on how to use the script, see README.md * A a new "show" command to display a Kubernetes Job example on how to run the script as part of a Kubernetes job. Run it like so: `./standalone.py show` * Rework the SDG fetch to allow for push - this will be revisited once the final evaluation is implemented * Hide all the SDG related command for running SDG with the script, now the script only fetches SDG data * Add Makefile to generate the standalone script and format it with Ruff Signed-off-by: Sébastien Han --- Makefile | 5 + README.md | 10 + pipeline.py | 18 +- standalone/README.md | 186 +++++++++++++++++ standalone.py => standalone/standalone.py | 210 ++++++++++++++++++-- standalone.tpl => standalone/standalone.tpl | 198 ++++++++++++++++-- 6 files changed, 581 insertions(+), 46 deletions(-) create mode 100644 Makefile create mode 100644 standalone/README.md rename standalone.py => standalone/standalone.py (76%) rename standalone.tpl => standalone/standalone.tpl (90%) diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..769cf6dd --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +.PHONY: standalone + +standalone: + python3 pipeline.py gen-standalone + ruff format standalone/standalone.py diff --git a/README.md b/README.md index d858c738..0c273eeb 100644 --- a/README.md +++ b/README.md @@ -42,3 +42,13 @@ To deploy a signed certificate in cluster follow [trusted cluster cert](signed-c This solution requires object storage to be in place either through S3 or using Noobaa. If you are using Noobaa apply the following [tuning paramters](noobaa/README.md) + +## Standalone Deployment + +See [standalone](standalone/README.md) for instructions on deploying the Instructlab solution +without the need for RHOAI. +To generate the `standalone.py` script, run the following command ([ruff](https://docs.astral.sh/ruff/installation/) tool must be installed): + +```bash +make standalone +``` diff --git a/pipeline.py b/pipeline.py index 35fdfae3..6d85341f 100644 --- a/pipeline.py +++ b/pipeline.py @@ -563,19 +563,20 @@ def gen_standalone(): # Open the template file try: - with open( - STANDALONE_TEMPLATE_FILE_NAME, "r", encoding="utf-8" - ) as template_file: + standalone_template_path = path.join( + "standalone", STANDALONE_TEMPLATE_FILE_NAME + ) + with open(standalone_template_path, "r", encoding="utf-8") as template_file: template_content = template_file.read() except FileNotFoundError as e: click.echo( - f"Error: The template file '{STANDALONE_TEMPLATE_FILE_NAME}' was not found.", + f"Error: The template file '{standalone_template_path}' was not found.", err=True, ) raise click.exceptions.Exit(1) from e except IOError as e: click.echo( - f"Error: An I/O error occurred while reading '{STANDALONE_TEMPLATE_FILE_NAME}': {e}", + f"Error: An I/O error occurred while reading '{standalone_template_path}': {e}", err=True, ) raise click.exceptions.Exit(1) @@ -585,7 +586,7 @@ def gen_standalone(): template = Template(template_content) except TemplateSyntaxError as e: click.echo( - f"Error: The template file '{STANDALONE_TEMPLATE_FILE_NAME}' contains a syntax error: {e}", + f"Error: The template file '{standalone_template_path}' contains a syntax error: {e}", err=True, ) raise click.exceptions.Exit(1) @@ -594,10 +595,11 @@ def gen_standalone(): rendered_code = template.render(details) # Write the rendered code to a new Python file - with open(GENERATED_STANDALONE_FILE_NAME, "w", encoding="utf-8") as output_file: + standalone_script_path = path.join("standalone", GENERATED_STANDALONE_FILE_NAME) + with open(standalone_script_path, "w", encoding="utf-8") as output_file: output_file.write(rendered_code) - click.echo(f"Successfully generated '{GENERATED_STANDALONE_FILE_NAME}' script.") + click.echo(f"Successfully generated '{standalone_script_path}' script.") def get_executor_details( diff --git a/standalone/README.md b/standalone/README.md new file mode 100644 index 00000000..e8147e80 --- /dev/null +++ b/standalone/README.md @@ -0,0 +1,186 @@ +# Standalone Tool Documentation + +## Overview + +The `standalone.py` script simulates the [InstructLab](https://instructlab.ai/) workflow within a [Kubernetes](https://kubernetes.io/) environment, +replicating the functionality of a [KubeFlow Pipeline](https://github.com/kubeflow/pipelines). This allows for distributed training and evaluation of models +without relying on centralized orchestration tools like KubeFlow. + +The `standalone.py` tool provides support for fetching generated SDG (Synthetic Data Generation) data from an object store that complies with the AWS S3 specification. While AWS S3 is supported, alternative object storage solutions such as Ceph, Nooba, and MinIO are also compatible. + +## Requirements + +Since the `standalone.py` script is designed to run within a Kubernetes environment, the following +requirements must be met: +* A Kubernetes cluster with the necessary resources to run the InstructLab workflow. + * Nodes with GPUs available for training. + * [KubeFlow training operator](https://github.com/kubeflow/training-operator) must be running and `PyTorchJob` CRD must be installed. + * A [StorageClass](https://kubernetes.io/docs/concepts/storage/storage-classes/) that supports dynamic provisioning with [ReadWriteMany](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes) access mode. +* A Kubernetes configuration that allows access to the Kubernetes cluster. + * Both cluster and in-cluster configurations are supported. +* SDG data generated and uploaded to an object store. + +> [!NOTE] +> The script can be run outside of a Kubernetes environment or within a Kubernetes Job, but it requires a Kubernetes configuration file to access the cluster. + +> [!TIP] +> Check the `show` command to display an example of a Kubernetes Job that runs the script. Use `./standalone.py show` to see the example. + +## Features + +* Run any part of the InstructLab workflow in a standalone environment independently or a full end-to-end workflow: + * Fetch SDG data from an object store. + * Train model. + * Evaluate model. + * Final model evaluation. (Not implemented yet) + * Push the final model back to the object store. (Not implemented yet) - same location as the SDG data. + +The `standalone.py` script includes a main command to execute the full workflow, along with +subcommands to run individual parts of the workflow separately. To view all available commands, use +`./standalone.py --help`. Essentially, this is how the script operates to execute the end-to-end +workflow. The command will retrieve the SDG data from the object store and set up the necessary +resources for running the training and evaluation steps: + +```bash +./standalone.py run --namespace my-namespace --sdg-object-store-secret sdg-data +``` + +Now let's say you only want to fetch the SDG data, you can use the `sdg-data-fetch` subcommand: + +```bash +./standalone.py run --namespace my-namespace --sdg-object-store-secret sdg-data sdg-data-fetch +``` + +Other subcommands are available to run the training and evaluation steps: + +```bash +./standalone.py run --namespace my-namespace train +./standalone.py run --namespace my-namespace evaluation +``` + +* Fetch SDG data from an object store. + * Support for AWS S3 and compatible object storage solutions. + * Configuration via CLI options, environment variables, or Kubernetes secrets. + +> [!CAUTION] +> All the CLI options MUST be positioned after the `run` command and BEFORE any subcommands. + +## Usage + +The script requires information regarding the location and method for accessing the SDG data. This information can be provided in two main ways: + +1. CLI Options or/and Environment Variables: Supply all necessary information via CLI options or environment variables. +2. Kubernetes Secret: Provide the name of a Kubernetes secret that contains all relevant details using the `--sdg-object-store-secret` option. + +### CLI Options + +* `--namespace`: The namespace in which the Kubernetes resources are located - **Required** +* `--storage-class`: The storage class to use for the PVCs - **Optional** - Default: cluster default storage class. +* `--nproc-per-node`: The number of processes to run per node - **Optional** - Default: 1. +* `--sdg-object-store-secret`: The name of the Kubernetes secret containing the SDG object store credentials. +* `--sdg-object-store-endpoint`: The endpoint of the object store. `SDG_OBJECT_STORE_ENDPOINT` environment variable can be used as well. +* `--sdg-object-store-bucket`: The bucket name in the object store. `SDG_OBJECT_STORE_BUCKET` environment variable can be used as well. +* `--sdg-object-store-access-key`: The access key for the object store. `SDG_OBJECT_STORE_ACCESS_KEY` environment variable can be used as well. +* `--sdg-object-store-secret-key`: The secret key for the object store. `SDG_OBJECT_STORE_SECRET_KEY` environment variable can be used as well. +* `--sdg-object-store-data-key`: The key for the SDG data in the object store. e.g., `sdg.tar.gz`. `SDG_OBJECT_STORE_DATA_KEY` environment variable can be used as well. +* `--sdg-object-store-verify-tls`: Whether to verify TLS for the object store endpoint (default: + true). `SDG_OBJECT_STORE_VERIFY_TLS` environment variable can be used as well. +* `--sdg-object-store-region`: The region of the object store. `SDG_OBJECT_STORE_REGION` environment variable can be used as well. + +## Example End-To-End Workflow + +### Generating and Uploading SDG Data + +The following example demonstrates how to generate SDG data, package it as a tarball, and upload it +to an object store. This assumes that AWS CLI is installed and configured with the necessary +credentials. +In this scenario the name of the bucket is `sdg-data` and the tarball file is `sdg.tar.gz`. + +```bash +ilab data generate +cd generated +tar -czvf sdg.tar.gz * +aws cp sdg.tar.gz s3://sdg-data/sdg.tar.gz +``` + +> [!CAUTION] +> Ensures SDG data is packaged as a tarball **without** top-level directories. So you must run `tar` inside the directory containing the SDG data. + +### Creating the Kubernetes Secret + +The simplest method to supply the script with the required information for retrieving SDG data is by +creating a Kubernetes secret. In the example below, we create a secret called `sdg-data` within the +`my-namespace` namespace, containing the necessary credentials. Ensure that you update the access +key and secret key as needed. The `data_key` field refers to the name of the tarball file in the +object store that holds the SDG data. In this case, it's named `sdg.tar.gz`, as we previously +uploaded the tarball to the object store using this name. + +```bash +cat < [!WARNING] +> The secret must be part of the same namespace as the resources that the script interacts with. +> It's inherented from the `--namespace` option. + +The list of all supported keys: + +* `bucket`: The bucket name in the object store - **Required** +* `access_key`: The access key for the object store - **Required** +* `secret_key`: The secret key for the object store - **Required** +* `data_key`: The key for the SDG data in the object store - **Required** +* `verify_tls`: Whether to verify TLS for the object store endpoint (default: true) - **Optional** +* `endpoint`: The endpoint of the object store, e.g: https://s3.openshift-storage.svc:443 - **Optional** +* `region`: The region of the object store - **Optional** + +#### Running the Script Without Kubernetes Secret + +Alternatively, you can provide the necessary information directly via CLI options or environment, +the script will use the provided information to fetch the SDG data and create its own Kubernetes +Secret named `sdg-object-store-credentials` in the same namespace as the resources it interacts with (in this case, `my-namespace`). + + +```bash +./standalone run \ + --namespace my-namespace \ + --sdg-object-store-access-key key \ + --sdg-object-store-secret-key key \ + --sdg-object-store-bucket sdg-data \ + --sdg-object-store-data-key sdg.tar.gz +``` + +#### Advanced Configuration Using an S3-Compatible Object Store + +If you don't use the official AWS S3 endpoint, you can provide additional information about the object store: + +```bash +./standalone run \ + --namespace foo \ + --sdg-object-store-access-key key \ + --sdg-object-store-secret-key key \ + --sdg-object-store-bucket sdg-data \ + --sdg-object-store-data-key sdg.tar.gz \ + --sdg-object-store-verify-tls false \ + --sdg-object-store-endpoint https://s3.openshift-storage.svc:443 +``` + +> [!IMPORTANT] +> The `--sdg-object-store-endpoint` option must be provided in the format +> `scheme://host:`, the port can be omitted if it's the default port. + +> [!TIP] +> If you don't want to run the entire workflow and only want to fetch the SDG data, you can use the `run sdg-data-fetch ` command diff --git a/standalone.py b/standalone/standalone.py similarity index 76% rename from standalone.py rename to standalone/standalone.py index 180a0318..5f2d0798 100755 --- a/standalone.py +++ b/standalone/standalone.py @@ -193,9 +193,16 @@ claimName: {output_pvc_name} """ # TODO: support signature version? -SDG_DATA_DOWNLOAD_SCRIPT = f""" +SDG_DATA_SCRIPT = """ set -e +export STRATEGY={strategy} + +if [ -z "$STRATEGY" ] || [ "$STRATEGY" == "None" ]; then + echo "STRATEGY is not set - must be 'download' or 'upload'" + exit 1 +fi + if python3 -c 'import boto3'; then echo 'boto3 is already installed' else @@ -212,19 +219,22 @@ import boto3 def str_to_bool(s): - if s is None: - return False - return s.lower() in ['true', '1', 't', 'y', 'yes'] + if s is None: + return False + return s.lower() in ['true', '1', 't', 'y', 'yes'] + +def build_boto3_client(): + return boto3.client( + 's3', + aws_access_key_id=os.getenv('SDG_OBJECT_STORE_ACCESS_KEY'), + aws_secret_access_key=os.getenv('SDG_OBJECT_STORE_SECRET_KEY'), + endpoint_url=os.getenv('SDG_OBJECT_STORE_ENDPOINT', None), + region_name=os.getenv('SDG_OBJECT_STORE_REGION', None), + verify=str_to_bool(os.getenv('SDG_OBJECT_STORE_VERIFY_TLS', None)) +) def download_s3_file(): - s3 = boto3.client( - 's3', - aws_access_key_id=os.getenv('SDG_OBJECT_STORE_ACCESS_KEY'), - aws_secret_access_key=os.getenv('SDG_OBJECT_STORE_SECRET_KEY'), - endpoint_url=os.getenv('SDG_OBJECT_STORE_ENDPOINT', None), - region_name=os.getenv('SDG_OBJECT_STORE_REGION', None), - verify=str_to_bool(os.getenv('SDG_OBJECT_STORE_VERIFY_TLS', None)) - ) + s3 = build_boto3_client() bucket_name = os.getenv('SDG_OBJECT_STORE_BUCKET') s3_key = os.getenv('SDG_OBJECT_STORE_DATA_KEY') @@ -232,13 +242,65 @@ def download_s3_file(): s3.download_file(bucket_name, s3_key, output_file) +def upload_s3_file(): + s3 = build_boto3_client() + + bucket_name = os.getenv('SDG_OBJECT_STORE_BUCKET') + s3_key = os.getenv('SDG_OBJECT_STORE_DATA_KEY') # TODO: change the name for the model name + input_file = '{SDG_PVC_MOUNT_PATH}/sdg.tar.gz' # TODO: change for model path + + s3.upload_file(input_file, bucket_name, s3_key) + if __name__ == "__main__": - download_s3_file() + if os.getenv('STRATEGY') == 'download': + print('Downloading file from S3') + download_s3_file() + elif os.getenv('STRATEGY') == 'upload': + print('Uploading file to S3') + upload_s3_file() + else: + raise ValueError('Unknown STRATEGY') EOF python "$tmp"/download_s3.py -mkdir -p {SDG_PVC_MOUNT_PATH}/generated -tar -xvf {SDG_PVC_MOUNT_PATH}/sdg.tar.gz -C {SDG_PVC_MOUNT_PATH}/generated + +if [[ "$STRATEGY" == "download" ]]; then + mkdir -p {SDG_PVC_MOUNT_PATH}/generated + tar -xvf {SDG_PVC_MOUNT_PATH}/sdg.tar.gz -C {SDG_PVC_MOUNT_PATH}/generated +fi +""" + +JOB_SCRIPT_EXAMPLE = """ +kind: Job +apiVersion: batch/v1 +metadata: + name: {name} + namespace: {namespace} +spec: + template: + spec: + serviceAccountName: {service_account} + containers: + - name: {name} + image: {image} + command: + - "python3" + - "/config/{script_name}" + - "run" + - "--namespace" + - "{namespace_workflow}" + - "--storage-class" + - "{storage_class}" + - "--sdg-object-store-secret" + - "{sdg_object_store_secret}" + volumeMounts: + - name: script-config + mountPath: /config + restartPolicy: Never + volumes: + - name: script-config + configMap: + name: {script_configmap} """ @@ -252,6 +314,99 @@ def cli(): """ +@cli.group(invoke_without_command=True) +@click.option( + "--namespace", + type=str, + default="default", + help="Kubernetes namespace to run the job", +) +@click.option( + "--namespace-workflow", + type=str, + default="default", + help="Kubernetes namespace to run the end-to-end workflow that the script will execute", +) +@click.option( + "--name", + type=str, + default="distributed-ilab", + help="Name of the Job to that can run the script", +) +@click.option( + "--image", + type=str, + help="Image to use to run the script in a Job", + required=True, +) +@click.option( + "--service-account", + type=str, + default="default", + help="Service account to use for the Job", +) +@click.option( + "--script-configmap", + type=str, + help="Name of the ConfigMap containing the standalone.py script", + required=True, +) +@click.option( + "--script-name", + type=str, + help="Name of the standalone script in the ConfigMap", + default="standalone.py", +) +@click.option( + "--storage-class", + type=str, + default="standard", + help="Storage class to use for the PersistentVolumeClaim - for SDG only", +) +@click.option( + "--sdg-object-store-secret", + envvar="SDG_OBJECT_STORE_SECRET", + help=( + "Name of the Kubernetes Secret containing the SDG object store credentials. " + "The namespace is inferred from the namespace option. " + "The following keys are expected: bucket, access_key, secret_key, data_key. " + " (SDG_OBJECT_STORE_SECRET env var)" + "If used, the endpoint, bucket, access_key, secret_key, region, data_key, verify_tls options will be ignored." + "All supported options are: endpoint, bucket, access_key, secret_key, region, data_key, verify_tls" + ), + default=SDG_OBJECT_STORE_SECRET_NAME, + type=str, +) +def show( + namespace: str, + namespace_workflow: str, + name: str, + image: str, + script_configmap: str, + script_name: str, + service_account: str, + storage_class: str, + sdg_object_store_secret: str, +): + """ + Print an example Job YAML to stdout to run the script in a Kubernetes cluster. + The job excepts the standalone.py script to be available in a ConfigMap. + """ + print( + JOB_SCRIPT_EXAMPLE.format( + name=name, + namespace=namespace, + namespace_workflow=namespace_workflow, + image=image, + script_configmap=script_configmap, + script_name=script_name, + service_account=service_account, + storage_class=storage_class, + sdg_object_store_secret=sdg_object_store_secret, + ) + ) + + @cli.group(invoke_without_command=True) @click.option( "--namespace", type=str, default="default", help="Kubernetes namespace to use" @@ -261,16 +416,19 @@ def cli(): type=str, default=DEFAULT_REPO_URL, help="URL of the taxonomy repository - for SDG only", + hidden=True, ) @click.option( "--taxonomy-repo-branch", type=str, help="Branch of the taxonomy repository - for SDG only", + hidden=True, ) @click.option( "--taxonomy-repo-pr", type=str, help="Pull request number of the taxonomy repository - for SDG only", + hidden=True, ) @click.option( "--storage-class", @@ -282,11 +440,13 @@ def cli(): "--serving-endpoint", type=str, help="Serving endpoint for SDG - for SDG only", + hidden=True, ) @click.option( "--serving-model", type=str, help="Serving model for SDG - for SDG only", + hidden=True, ) @click.option( "--nproc-per-node", @@ -298,6 +458,7 @@ def cli(): "--eval-type", help="Type of evaluation to run", type=click.Choice(["mmlu", "mt-bench"]), + hidden=True, ) @click.option( "--training-phase", @@ -358,6 +519,7 @@ def cli(): "--sdg-object-store-verify-tls", envvar="SDG_OBJECT_STORE_VERIFY_TLS", help="Verify TLS for the object store. (SDG_OBJECT_STORE_VERIFY_TLS env var).", + default=True, type=bool, ) @click.option( @@ -557,7 +719,7 @@ def create_sdg_job( "sh", "-ec", 'program_path=$(mktemp -d)\n\nprintf "%s" "$0" > "$program_path/ephemeral_component.py"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"\n', - '\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef sdg_op(\n num_instructions_to_generate: int,\n taxonomy: dsl.Input[dsl.Dataset],\n sdg: dsl.Output[dsl.Dataset],\n repo_branch: Optional[str],\n repo_pr: Optional[int],\n):\n import openai\n from instructlab.sdg import generate_data\n from instructlab.sdg.utils.taxonomy import read_taxonomy\n from os import getenv\n\n api_key = getenv("api_key")\n model = getenv("model")\n endpoint = getenv("endpoint")\n client = openai.OpenAI(base_url=endpoint, api_key=api_key)\n\n taxonomy_base = "main" if repo_branch or (repo_pr and int(repo_pr) > 0) else "empty"\n\n print("Generating syntetic dataset for:")\n print()\n print(read_taxonomy(taxonomy.path, taxonomy_base))\n\n # generate_data has a magic word for its taxonomy_base argument - `empty`\n # it allows generating from the whole repo, see:\n # https://github.com/instructlab/sdg/blob/c6a9e74a1618b1077cd38e713b8aaed8b7c0c8ce/src/instructlab/sdg/utils/taxonomy.py#L230\n generate_data(\n client=client,\n num_instructions_to_generate=num_instructions_to_generate,\n output_dir=sdg.path,\n taxonomy=taxonomy.path,\n taxonomy_base=taxonomy_base,\n model_name=model,\n )\n\n', + '\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef sdg_op(\n num_instructions_to_generate: int,\n taxonomy: dsl.Input[dsl.Dataset],\n sdg: dsl.Output[dsl.Dataset],\n repo_branch: Optional[str],\n repo_pr: Optional[int],\n):\n from os import getenv\n\n import openai\n from instructlab.sdg import generate_data\n from instructlab.sdg.utils.taxonomy import read_taxonomy\n\n api_key = getenv("api_key")\n model = getenv("model")\n endpoint = getenv("endpoint")\n client = openai.OpenAI(base_url=endpoint, api_key=api_key)\n\n taxonomy_base = "main" if repo_branch or (repo_pr and int(repo_pr) > 0) else "empty"\n\n print("Generating syntetic dataset for:")\n print()\n print(read_taxonomy(taxonomy.path, taxonomy_base))\n\n # generate_data has a magic word for its taxonomy_base argument - `empty`\n # it allows generating from the whole repo, see:\n # https://github.com/instructlab/sdg/blob/c6a9e74a1618b1077cd38e713b8aaed8b7c0c8ce/src/instructlab/sdg/utils/taxonomy.py#L230\n generate_data(\n client=client,\n num_instructions_to_generate=num_instructions_to_generate,\n output_dir=sdg.path,\n taxonomy=taxonomy.path,\n taxonomy_base=taxonomy_base,\n model_name=model,\n chunk_word_count=1000,\n server_ctx_size=4096,\n )\n\n', ], args=[ "--executor_input", @@ -615,7 +777,7 @@ def create_sdg_job( "sh", "-ec", 'program_path=$(mktemp -d)\n\nprintf "%s" "$0" > "$program_path/ephemeral_component.py"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"\n', - '\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef data_processing_op(\n sdg: dsl.Input[dsl.Dataset],\n processed_data: dsl.Output[dsl.Dataset],\n model: dsl.Input[dsl.Artifact],\n max_seq_len: Optional[int] = 4096,\n max_batch_len: Optional[int] = 20000,\n):\n import instructlab.training.data_process as dp\n import os\n from instructlab.training import (\n TrainingArgs,\n DataProcessArgs,\n )\n\n # define training-specific arguments\n training_args = TrainingArgs(\n # define data-specific arguments\n model_path=model.path,\n data_path=f"{sdg.path}/*_train_msgs*.jsonl",\n data_output_dir=processed_data.path,\n # define model-trianing parameters\n max_seq_len=max_seq_len,\n max_batch_len=max_batch_len,\n # XXX(shanand): We don\'t need the following arguments\n # for data processing. Added them for now to avoid\n # Pydantic validation errors for TrainingArgs\n ckpt_output_dir="data/saved_checkpoints",\n num_epochs=2,\n effective_batch_size=3840,\n save_samples=0,\n learning_rate=2e-6,\n warmup_steps=800,\n is_padding_free=True,\n )\n\n def data_processing(train_args: TrainingArgs) -> None:\n # early validation logic here\n if train_args.max_batch_len < train_args.max_seq_len:\n raise ValueError(\n f"the `max_batch_len` cannot be less than `max_seq_len`: {train_args.max_batch_len=} < {train_args.max_seq_len=}"\n )\n\n # process the training data\n if not os.path.exists(train_args.data_output_dir):\n os.makedirs(train_args.data_output_dir, exist_ok=True)\n dp.main(\n DataProcessArgs(\n # XXX(osilkin): make a decision here, either:\n # 1. the CLI is fully responsible for managing where the data is written\n # 2. we never cache it and simply write it to a tmp file every time.\n #\n # An important reason for why #1 would be preferable is in the case of OpenShift/SELinux\n # where the user has a defined place for new temporary data to be written.\n data_output_path=train_args.data_output_dir,\n model_path=train_args.model_path,\n data_path=train_args.data_path,\n max_seq_len=train_args.max_seq_len,\n chat_tmpl_path=train_args.chat_tmpl_path,\n )\n )\n\n data_processing(train_args=training_args)\n\n', + '\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef data_processing_op(\n sdg: dsl.Input[dsl.Dataset],\n processed_data: dsl.Output[dsl.Dataset],\n model: dsl.Input[dsl.Artifact],\n max_seq_len: Optional[int] = 4096,\n max_batch_len: Optional[int] = 20000,\n):\n import os\n\n import instructlab.training.data_process as dp\n from instructlab.training import (\n DataProcessArgs,\n TrainingArgs,\n )\n\n # define training-specific arguments\n training_args = TrainingArgs(\n # define data-specific arguments\n model_path=model.path,\n data_path=f"{sdg.path}/*_train_msgs*.jsonl",\n data_output_dir=processed_data.path,\n # define model-trianing parameters\n max_seq_len=max_seq_len,\n max_batch_len=max_batch_len,\n # XXX(shanand): We don\'t need the following arguments\n # for data processing. Added them for now to avoid\n # Pydantic validation errors for TrainingArgs\n ckpt_output_dir="data/saved_checkpoints",\n num_epochs=2,\n effective_batch_size=3840,\n save_samples=0,\n learning_rate=2e-6,\n warmup_steps=800,\n is_padding_free=True,\n )\n\n def data_processing(train_args: TrainingArgs) -> None:\n # early validation logic here\n if train_args.max_batch_len < train_args.max_seq_len:\n raise ValueError(\n f"the `max_batch_len` cannot be less than `max_seq_len`: {train_args.max_batch_len=} < {train_args.max_seq_len=}"\n )\n\n # process the training data\n if not os.path.exists(train_args.data_output_dir):\n os.makedirs(train_args.data_output_dir, exist_ok=True)\n dp.main(\n DataProcessArgs(\n # XXX(osilkin): make a decision here, either:\n # 1. the CLI is fully responsible for managing where the data is written\n # 2. we never cache it and simply write it to a tmp file every time.\n #\n # An important reason for why #1 would be preferable is in the case of OpenShift/SELinux\n # where the user has a defined place for new temporary data to be written.\n data_output_path=train_args.data_output_dir,\n model_path=train_args.model_path,\n data_path=train_args.data_path,\n max_seq_len=train_args.max_seq_len,\n chat_tmpl_path=train_args.chat_tmpl_path,\n )\n )\n\n data_processing(train_args=training_args)\n\n', ], args=[ "--executor_input", @@ -720,7 +882,11 @@ def create_sdg_data_fetch_job( name="fetch-sdg-files-from-object-store", image=PYTHON_IMAGE, command=["/bin/sh", "-c"], - args=[SDG_DATA_DOWNLOAD_SCRIPT], + args=[ + SDG_DATA_SCRIPT.format( + strategy="download", SDG_PVC_MOUNT_PATH=SDG_PVC_MOUNT_PATH + ) + ], volume_mounts=get_sdg_vol_mount(), env=[ kubernetes.client.V1EnvVar( @@ -859,7 +1025,7 @@ def create_eval_job( "sh", "-ec", 'program_path=$(mktemp -d)\n\nprintf "%s" "$0" > "$program_path/ephemeral_component.py"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"\n', - '\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef run_mmlu_op(\n mmlu_output: Output[Artifact],\n models_path_prefix: str,\n mmlu_tasks_list: str,\n model_dtype: str,\n few_shots: int,\n batch_size: int,\n device: str = None,\n models_list: List[str] = None,\n models_folder: Optional[str] = None,\n) -> NamedTuple("outputs", best_model=str, best_score=float):\n import json\n import os\n import torch\n from instructlab.eval.mmlu import MMLUEvaluator, MMLU_TASKS\n\n mmlu_tasks = mmlu_tasks_list.split(",") if mmlu_tasks_list else MMLU_TASKS\n\n if models_list is None and models_folder:\n models_list = os.listdir(models_folder)\n\n # Device setup and debug\n gpu_available = torch.cuda.is_available()\n gpu_name = (\n torch.cuda.get_device_name(torch.cuda.current_device())\n if gpu_available\n else "No GPU available"\n )\n\n print(f"GPU Available: {gpu_available}, Using: {gpu_name}")\n\n effective_device = (\n device if device is not None else ("cuda" if gpu_available else "cpu")\n )\n print(f"Running on device: {effective_device}")\n\n scores = {}\n all_mmlu_data = []\n\n for model_name in models_list:\n model_path = f"{models_path_prefix}/{model_name}"\n # Debug\n print(f"Model {model_name} is stored at: {model_path}")\n\n # Evaluation\n evaluator = MMLUEvaluator(\n model_path=model_path,\n tasks=mmlu_tasks,\n model_dtype=model_dtype,\n few_shots=few_shots,\n batch_size=batch_size,\n device=effective_device,\n )\n\n mmlu_score, individual_scores = evaluator.run()\n average_score = round(mmlu_score, 2)\n print(\n f"Model {model_name} is stored at: {model_path} with AVERAGE_SCORE: {average_score}"\n )\n\n mmlu_data = {\n "report_title": "KNOWLEDGE EVALUATION REPORT",\n "model": model_name,\n "average_score": average_score,\n "number_of_tasks": len(individual_scores),\n "individual_scores": [\n {task: round(score["score"], 2)}\n for task, score in individual_scores.items()\n ],\n }\n\n all_mmlu_data.append(mmlu_data)\n scores[model_path] = average_score\n\n with open(mmlu_output.path, "w") as f:\n json.dump(all_mmlu_data, f, indent=4)\n outputs = NamedTuple("outputs", best_model=str, best_score=float)\n best_model = max(scores, key=scores.get)\n best_score = scores[best_model]\n return outputs(best_model=best_model, best_score=best_score)\n\n', + '\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef run_mmlu_op(\n mmlu_output: Output[Artifact],\n models_path_prefix: str,\n mmlu_tasks_list: str,\n model_dtype: str,\n few_shots: int,\n batch_size: int,\n device: str = None,\n models_list: List[str] = None,\n models_folder: Optional[str] = None,\n) -> NamedTuple("outputs", best_model=str, best_score=float):\n import json\n import os\n\n import torch\n from instructlab.eval.mmlu import MMLU_TASKS, MMLUEvaluator\n\n mmlu_tasks = mmlu_tasks_list.split(",") if mmlu_tasks_list else MMLU_TASKS\n\n if models_list is None and models_folder:\n models_list = os.listdir(models_folder)\n\n # Device setup and debug\n gpu_available = torch.cuda.is_available()\n gpu_name = (\n torch.cuda.get_device_name(torch.cuda.current_device())\n if gpu_available\n else "No GPU available"\n )\n\n print(f"GPU Available: {gpu_available}, Using: {gpu_name}")\n\n effective_device = (\n device if device is not None else ("cuda" if gpu_available else "cpu")\n )\n print(f"Running on device: {effective_device}")\n\n scores = {}\n all_mmlu_data = []\n\n for model_name in models_list:\n model_path = f"{models_path_prefix}/{model_name}"\n # Debug\n print(f"Model {model_name} is stored at: {model_path}")\n\n # Evaluation\n evaluator = MMLUEvaluator(\n model_path=model_path,\n tasks=mmlu_tasks,\n model_dtype=model_dtype,\n few_shots=few_shots,\n batch_size=batch_size,\n device=effective_device,\n )\n\n mmlu_score, individual_scores = evaluator.run()\n average_score = round(mmlu_score, 2)\n print(\n f"Model {model_name} is stored at: {model_path} with AVERAGE_SCORE: {average_score}"\n )\n\n mmlu_data = {\n "report_title": "KNOWLEDGE EVALUATION REPORT",\n "model": model_name,\n "average_score": average_score,\n "number_of_tasks": len(individual_scores),\n "individual_scores": [\n {task: round(score["score"], 2)}\n for task, score in individual_scores.items()\n ],\n }\n\n all_mmlu_data.append(mmlu_data)\n scores[model_path] = average_score\n\n with open(mmlu_output.path, "w") as f:\n json.dump(all_mmlu_data, f, indent=4)\n outputs = NamedTuple("outputs", best_model=str, best_score=float)\n best_model = max(scores, key=scores.get)\n best_score = scores[best_model]\n return outputs(best_model=best_model, best_score=best_score)\n\n', ], args=[ "--executor_input", @@ -889,7 +1055,7 @@ def create_eval_job( init_containers = [ kubernetes.client.V1Container( name=f"run-eval-{eval_type}", - image="quay.io/sallyom/instructlab-ocp:eval", + image="quay.io/sallyom/instructlab-ocp:eval-7ee213", command=[ "sh", "-c", @@ -897,7 +1063,7 @@ def create_eval_job( "sh", "-ec", 'program_path=$(mktemp -d)\n\nprintf "%s" "$0" > "$program_path/ephemeral_component.py"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"\n', - '\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef run_mt_bench_op(\n models_path_prefix: str,\n mt_bench_output: Output[Artifact],\n merge_system_user_message: bool,\n # generate_answers,judgment uses a magic word for its mt_bench evaluator - `auto`\n # with `auto`, number of gpus allocated for serving is calculated based on environment\n # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n max_workers: str = "auto",\n models_list: List[str] = None,\n models_folder: Optional[str] = None,\n device: str = None,\n) -> NamedTuple("outputs", best_model=str, best_score=float):\n def launch_vllm_server_background(\n model_path: str, gpu_count: int, retries: int = 60, delay: int = 5\n ):\n import subprocess\n import sys\n import time\n import requests\n\n if gpu_count > 0:\n command = [\n sys.executable,\n "-m",\n "vllm.entrypoints.openai.api_server",\n "--model",\n model_path,\n "--tensor-parallel-size",\n str(gpu_count),\n ]\n else:\n command = [\n sys.executable,\n "-m",\n "vllm.entrypoints.openai.api_server",\n "--model",\n model_path,\n ]\n\n subprocess.Popen(args=command)\n\n server_url = "http://localhost:8000/v1"\n print(f"Waiting for vLLM server to start at {server_url}...")\n\n for attempt in range(retries):\n try:\n response = requests.get(f"{server_url}/models")\n if response.status_code == 200:\n print(f"vLLM server is up and running at {server_url}.")\n return\n except requests.ConnectionError:\n pass\n\n print(\n f"Server not available yet, retrying in {delay} seconds (Attempt {attempt + 1}/{retries})..."\n )\n time.sleep(delay)\n\n raise RuntimeError(\n f"Failed to start vLLM server at {server_url} after {retries} retries."\n )\n\n # This seems like excessive effort to stop the vllm process, but merely saving & killing the pid doesn\'t work\n # Also, the base image does not include `pkill` cmd, so can\'t pkill -f vllm.entrypoints.openai.api_server either\n def stop_vllm_server_by_name():\n import psutil\n\n for process in psutil.process_iter(attrs=["pid", "name", "cmdline"]):\n cmdline = process.info.get("cmdline")\n if cmdline and "vllm.entrypoints.openai.api_server" in cmdline:\n print(\n f"Found vLLM server process with PID: {process.info[\'pid\']}, terminating..."\n )\n try:\n process.terminate() # Try graceful termination\n process.wait(timeout=5) # Wait a bit for it to terminate\n if process.is_running():\n print(\n f"Forcefully killing vLLM server process with PID: {process.info[\'pid\']}"\n )\n process.kill() # Force kill if it\'s still running\n print(\n f"Successfully stopped vLLM server with PID: {process.info[\'pid\']}"\n )\n except psutil.NoSuchProcess:\n print(f"Process with PID {process.info[\'pid\']} no longer exists.")\n except psutil.AccessDenied:\n print(\n f"Access denied when trying to terminate process with PID {process.info[\'pid\']}."\n )\n except Exception as e:\n print(\n f"Failed to terminate process with PID {process.info[\'pid\']}. Error: {e}"\n )\n\n import json\n import torch\n import os\n\n from instructlab.eval import mt_bench_answers, mt_bench_judgment\n\n os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"\n candidate_server_url = "http://localhost:8000/v1"\n\n gpu_available = torch.cuda.is_available()\n gpu_name = (\n torch.cuda.get_device_name(torch.cuda.current_device())\n if gpu_available\n else "No GPU available"\n )\n gpu_count = torch.cuda.device_count() if gpu_available else 0\n\n print(f"GPU Available: {gpu_available}, {gpu_name}")\n\n # See note above about magic word "auto"\n if max_workers == "auto":\n try:\n usable_cpu_count = len(os.sched_getaffinity(0)) // 2\n except AttributeError:\n usable_cpu_count = multiprocessing.cpu_count() // 2\n max_workers = usable_cpu_count\n\n # TODO: Using evaluator results in connection errors, need to determine why.\n # For now, using mt_bench_answers.generate_answers & mt_bench_judgment.generate_judgment\n # evaluator = MTBenchEvaluator(\n # model_name=candidate_model_name,\n # judge_model_name=judge_model_name,\n # max_workers=max_workers,\n # merge_system_user_message=merge_system_user_message\n # )\n\n if models_list is None and models_folder:\n models_list = os.listdir(models_folder)\n\n judge_api_key = os.getenv("JUDGE_API_KEY", "")\n judge_model_name = os.getenv("JUDGE_NAME")\n judge_endpoint = os.getenv("JUDGE_ENDPOINT")\n\n scores = {}\n all_mt_bench_data = []\n\n for model_name in models_list:\n print(f"Serving candidate model: {model_name}")\n model_path = f"{models_path_prefix}/{model_name}"\n\n # Launch the vLLM server and wait until it is ready\n launch_vllm_server_background(model_path, gpu_count)\n\n # model ID is the model_path value in vLLM\n print("Generating answers...")\n mt_bench_answers.generate_answers(\n model_name=model_path,\n model_api_base=candidate_server_url,\n output_dir="/tmp/eval_output",\n max_workers=max_workers,\n )\n\n print("Judging answers...")\n overall_score, qa_pairs, turn_scores, error_rate = (\n mt_bench_judgment.generate_judgment(\n model_name=model_path,\n judge_model_name=judge_model_name,\n model_api_base=judge_endpoint,\n api_key=judge_api_key,\n output_dir="/tmp/eval_output",\n max_workers=max_workers,\n merge_system_user_message=merge_system_user_message,\n )\n )\n\n stop_vllm_server_by_name()\n\n mt_bench_data = {\n "report_title": "SKILLS EVALUATION REPORT",\n "model": model_path,\n "judge_model": judge_model_name,\n "overall_score": overall_score,\n "turn_scores": turn_scores,\n "qa_scores": qa_pairs,\n "error_rate": error_rate,\n }\n\n all_mt_bench_data.append(mt_bench_data)\n scores[model_path] = overall_score\n\n with open(mt_bench_output.path, "w") as f:\n json.dump(all_mt_bench_data, f, indent=4)\n\n outputs = NamedTuple("outputs", best_model=str, best_score=float)\n best_model = max(scores, key=scores.get)\n best_score = scores[best_model]\n return outputs(best_model=best_model, best_score=best_score)\n\n', + '\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef run_mt_bench_op(\n models_path_prefix: str,\n mt_bench_output: Output[Artifact],\n merge_system_user_message: bool,\n # generate_answers,judgment uses a magic word for its mt_bench evaluator - `auto`\n # with `auto`, number of gpus allocated for serving is calculated based on environment\n # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n max_workers: str,\n models_list: List[str] = None,\n models_folder: Optional[str] = None,\n device: str = None,\n) -> NamedTuple("outputs", best_model=str, best_score=float):\n def launch_vllm(model_path: str, gpu_count: int, retries: int = 60, delay: int = 5):\n import subprocess\n import sys\n import time\n\n import requests\n\n if gpu_count > 0:\n command = [\n sys.executable,\n "-m",\n "vllm.entrypoints.openai.api_server",\n "--model",\n model_path,\n "--tensor-parallel-size",\n str(gpu_count),\n ]\n else:\n command = [\n sys.executable,\n "-m",\n "vllm.entrypoints.openai.api_server",\n "--model",\n model_path,\n ]\n\n subprocess.Popen(args=command)\n\n server_url = "http://localhost:8000/v1"\n print(f"Waiting for vLLM server to start at {server_url}...")\n\n for attempt in range(retries):\n try:\n response = requests.get(f"{server_url}/models")\n if response.status_code == 200:\n print(f"vLLM server is up and running at {server_url}.")\n return\n except requests.ConnectionError:\n pass\n\n print(\n f"Server not available yet, retrying in {delay} seconds (Attempt {attempt + 1}/{retries})..."\n )\n time.sleep(delay)\n\n raise RuntimeError(\n f"Failed to start vLLM server at {server_url} after {retries} retries."\n )\n\n # This seems like excessive effort to stop the vllm process, but merely saving & killing the pid doesn\'t work\n # Also, the base image does not include `pkill` cmd, so can\'t pkill -f vllm.entrypoints.openai.api_server either\n def stop_vllm_server_by_name():\n import psutil\n\n for process in psutil.process_iter(attrs=["pid", "name", "cmdline"]):\n cmdline = process.info.get("cmdline")\n if cmdline and "vllm.entrypoints.openai.api_server" in cmdline:\n print(\n f"Found vLLM server process with PID: {process.info[\'pid\']}, terminating..."\n )\n try:\n process.terminate() # Try graceful termination\n process.wait(timeout=5) # Wait a bit for it to terminate\n if process.is_running():\n print(\n f"Forcefully killing vLLM server process with PID: {process.info[\'pid\']}"\n )\n process.kill() # Force kill if it\'s still running\n print(\n f"Successfully stopped vLLM server with PID: {process.info[\'pid\']}"\n )\n except psutil.NoSuchProcess:\n print(f"Process with PID {process.info[\'pid\']} no longer exists.")\n except psutil.AccessDenied:\n print(\n f"Access denied when trying to terminate process with PID {process.info[\'pid\']}."\n )\n except Exception as e:\n print(\n f"Failed to terminate process with PID {process.info[\'pid\']}. Error: {e}"\n )\n\n import json\n import os\n\n import torch\n from instructlab.eval.mt_bench import MTBenchEvaluator\n\n os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"\n vllm_server = "http://localhost:8000/v1"\n\n gpu_available = torch.cuda.is_available()\n gpu_name = (\n torch.cuda.get_device_name(torch.cuda.current_device())\n if gpu_available\n else "No GPU available"\n )\n gpu_count = torch.cuda.device_count() if gpu_available else 0\n\n print(f"GPU Available: {gpu_available}, {gpu_name}")\n\n if models_list is None and models_folder:\n models_list = os.listdir(models_folder)\n\n judge_api_key = os.getenv("JUDGE_API_KEY", "")\n judge_model_name = os.getenv("JUDGE_NAME")\n judge_endpoint = os.getenv("JUDGE_ENDPOINT")\n\n scores = {}\n all_mt_bench_data = []\n\n # generate_answers,judgment uses a magic word for its mt_bench evaluator - `auto`\n # with `auto`, number of gpus allocated for serving is calculated based on environment\n # https://github.com/instructlab/eval/blob/main/src/instructlab/eval/mt_bench.py#L36\n if max_workers == "auto":\n try:\n usable_cpu_count = len(os.sched_getaffinity(0)) // 2\n except AttributeError:\n usable_cpu_count = multiprocessing.cpu_count() // 2\n max_workers = usable_cpu_count\n\n for model_name in models_list:\n print(f"Serving candidate model: {model_name}")\n model_path = f"{models_path_prefix}/{model_name}"\n\n launch_vllm(model_path, gpu_count)\n\n # model ID is the model_path value in vLLM\n evaluator = MTBenchEvaluator(\n model_name=model_path,\n judge_model_name=judge_model_name,\n output_dir="/tmp/eval_output",\n merge_system_user_message=merge_system_user_message,\n )\n\n evaluator.gen_answers(\n server_url=vllm_server,\n serving_gpus=gpu_count,\n max_workers=max_workers,\n )\n\n stop_vllm_server_by_name()\n\n overall_score, qa_pairs, turn_scores, error_rate = evaluator.judge_answers(\n server_url=judge_endpoint,\n api_key=judge_api_key,\n serving_gpus=gpu_count,\n max_workers=max_workers,\n )\n\n mt_bench_data = {\n "report_title": "SKILLS EVALUATION REPORT",\n "model": model_path,\n "judge_model": judge_model_name,\n "overall_score": overall_score,\n "turn_scores": turn_scores,\n "qa_scores": qa_pairs,\n "error_rate": error_rate,\n }\n\n all_mt_bench_data.append(mt_bench_data)\n scores[model_path] = overall_score\n\n with open(mt_bench_output.path, "w") as f:\n json.dump(all_mt_bench_data, f, indent=4)\n\n outputs = NamedTuple("outputs", best_model=str, best_score=float)\n best_model = max(scores, key=scores.get)\n best_score = scores[best_model]\n return outputs(best_model=best_model, best_score=best_score)\n\n', ], args=[ "--executor_input", @@ -914,7 +1080,7 @@ def create_eval_job( ] container = kubernetes.client.V1Container( name=f"output-eval-{eval_type}-scores", - image="quay.io/sallyom/instructlab-ocp:eval", + image="quay.io/sallyom/instructlab-ocp:eval-7ee213", command=["/bin/sh", "-c"], args=[f"cat {MT_BENCH_SCORES_PATH}"], volume_mounts=[ diff --git a/standalone.tpl b/standalone/standalone.tpl similarity index 90% rename from standalone.tpl rename to standalone/standalone.tpl index c62a4722..ea345faa 100755 --- a/standalone.tpl +++ b/standalone/standalone.tpl @@ -178,9 +178,16 @@ spec: claimName: {output_pvc_name} """ # TODO: support signature version? -SDG_DATA_DOWNLOAD_SCRIPT = f""" +SDG_DATA_SCRIPT = """ set -e +export STRATEGY={strategy} + +if [ -z "$STRATEGY" ] || [ "$STRATEGY" == "None" ]; then + echo "STRATEGY is not set - must be 'download' or 'upload'" + exit 1 +fi + if python3 -c 'import boto3'; then echo 'boto3 is already installed' else @@ -197,19 +204,22 @@ import os import boto3 def str_to_bool(s): - if s is None: - return False - return s.lower() in ['true', '1', 't', 'y', 'yes'] + if s is None: + return False + return s.lower() in ['true', '1', 't', 'y', 'yes'] + +def build_boto3_client(): + return boto3.client( + 's3', + aws_access_key_id=os.getenv('SDG_OBJECT_STORE_ACCESS_KEY'), + aws_secret_access_key=os.getenv('SDG_OBJECT_STORE_SECRET_KEY'), + endpoint_url=os.getenv('SDG_OBJECT_STORE_ENDPOINT', None), + region_name=os.getenv('SDG_OBJECT_STORE_REGION', None), + verify=str_to_bool(os.getenv('SDG_OBJECT_STORE_VERIFY_TLS', None)) +) def download_s3_file(): - s3 = boto3.client( - 's3', - aws_access_key_id=os.getenv('SDG_OBJECT_STORE_ACCESS_KEY'), - aws_secret_access_key=os.getenv('SDG_OBJECT_STORE_SECRET_KEY'), - endpoint_url=os.getenv('SDG_OBJECT_STORE_ENDPOINT', None), - region_name=os.getenv('SDG_OBJECT_STORE_REGION', None), - verify=str_to_bool(os.getenv('SDG_OBJECT_STORE_VERIFY_TLS', None)) - ) + s3 = build_boto3_client() bucket_name = os.getenv('SDG_OBJECT_STORE_BUCKET') s3_key = os.getenv('SDG_OBJECT_STORE_DATA_KEY') @@ -217,13 +227,65 @@ def download_s3_file(): s3.download_file(bucket_name, s3_key, output_file) +def upload_s3_file(): + s3 = build_boto3_client() + + bucket_name = os.getenv('SDG_OBJECT_STORE_BUCKET') + s3_key = os.getenv('SDG_OBJECT_STORE_DATA_KEY') # TODO: change the name for the model name + input_file = '{SDG_PVC_MOUNT_PATH}/sdg.tar.gz' # TODO: change for model path + + s3.upload_file(input_file, bucket_name, s3_key) + if __name__ == "__main__": - download_s3_file() + if os.getenv('STRATEGY') == 'download': + print('Downloading file from S3') + download_s3_file() + elif os.getenv('STRATEGY') == 'upload': + print('Uploading file to S3') + upload_s3_file() + else: + raise ValueError('Unknown STRATEGY') EOF python "$tmp"/download_s3.py -mkdir -p {SDG_PVC_MOUNT_PATH}/generated -tar -xvf {SDG_PVC_MOUNT_PATH}/sdg.tar.gz -C {SDG_PVC_MOUNT_PATH}/generated + +if [[ "$STRATEGY" == "download" ]]; then + mkdir -p {SDG_PVC_MOUNT_PATH}/generated + tar -xvf {SDG_PVC_MOUNT_PATH}/sdg.tar.gz -C {SDG_PVC_MOUNT_PATH}/generated +fi +""" + +JOB_SCRIPT_EXAMPLE = """ +kind: Job +apiVersion: batch/v1 +metadata: + name: {name} + namespace: {namespace} +spec: + template: + spec: + serviceAccountName: {service_account} + containers: + - name: {name} + image: {image} + command: + - "python3" + - "/config/{script_name}" + - "run" + - "--namespace" + - "{namespace_workflow}" + - "--storage-class" + - "{storage_class}" + - "--sdg-object-store-secret" + - "{sdg_object_store_secret}" + volumeMounts: + - name: script-config + mountPath: /config + restartPolicy: Never + volumes: + - name: script-config + configMap: + name: {script_configmap} """ @@ -237,6 +299,99 @@ def cli(): """ +@cli.group(invoke_without_command=True) +@click.option( + "--namespace", + type=str, + default="default", + help="Kubernetes namespace to run the job", +) +@click.option( + "--namespace-workflow", + type=str, + default="default", + help="Kubernetes namespace to run the end-to-end workflow that the script will execute", +) +@click.option( + "--name", + type=str, + default="distributed-ilab", + help="Name of the Job to that can run the script", +) +@click.option( + "--image", + type=str, + help="Image to use to run the script in a Job", + required=True, +) +@click.option( + "--service-account", + type=str, + default="default", + help="Service account to use for the Job", +) +@click.option( + "--script-configmap", + type=str, + help="Name of the ConfigMap containing the standalone.py script", + required=True, +) +@click.option( + "--script-name", + type=str, + help="Name of the standalone script in the ConfigMap", + default="standalone.py", +) +@click.option( + "--storage-class", + type=str, + default="standard", + help="Storage class to use for the PersistentVolumeClaim - for SDG only", +) +@click.option( + "--sdg-object-store-secret", + envvar="SDG_OBJECT_STORE_SECRET", + help=( + "Name of the Kubernetes Secret containing the SDG object store credentials. " + "The namespace is inferred from the namespace option. " + "The following keys are expected: bucket, access_key, secret_key, data_key. " + " (SDG_OBJECT_STORE_SECRET env var)" + "If used, the endpoint, bucket, access_key, secret_key, region, data_key, verify_tls options will be ignored." + "All supported options are: endpoint, bucket, access_key, secret_key, region, data_key, verify_tls" + ), + default=SDG_OBJECT_STORE_SECRET_NAME, + type=str, +) +def show( + namespace: str, + namespace_workflow: str, + name: str, + image: str, + script_configmap: str, + script_name: str, + service_account: str, + storage_class: str, + sdg_object_store_secret: str, +): + """ + Print an example Job YAML to stdout to run the script in a Kubernetes cluster. + The job excepts the standalone.py script to be available in a ConfigMap. + """ + print( + JOB_SCRIPT_EXAMPLE.format( + name=name, + namespace=namespace, + namespace_workflow=namespace_workflow, + image=image, + script_configmap=script_configmap, + script_name=script_name, + service_account=service_account, + storage_class=storage_class, + sdg_object_store_secret=sdg_object_store_secret, + ) + ) + + @cli.group(invoke_without_command=True) @click.option( "--namespace", type=str, default="default", help="Kubernetes namespace to use" @@ -246,16 +401,19 @@ def cli(): type=str, default=DEFAULT_REPO_URL, help="URL of the taxonomy repository - for SDG only", + hidden=True, ) @click.option( "--taxonomy-repo-branch", type=str, help="Branch of the taxonomy repository - for SDG only", + hidden=True, ) @click.option( "--taxonomy-repo-pr", type=str, help="Pull request number of the taxonomy repository - for SDG only", + hidden=True, ) @click.option( "--storage-class", @@ -267,11 +425,13 @@ def cli(): "--serving-endpoint", type=str, help="Serving endpoint for SDG - for SDG only", + hidden=True, ) @click.option( "--serving-model", type=str, help="Serving model for SDG - for SDG only", + hidden=True, ) @click.option( "--nproc-per-node", @@ -283,6 +443,7 @@ def cli(): "--eval-type", help="Type of evaluation to run", type=click.Choice(["mmlu", "mt-bench"]), + hidden=True, ) @click.option( "--training-phase", @@ -343,6 +504,7 @@ def cli(): "--sdg-object-store-verify-tls", envvar="SDG_OBJECT_STORE_VERIFY_TLS", help="Verify TLS for the object store. (SDG_OBJECT_STORE_VERIFY_TLS env var).", + default=True, type=bool, ) @click.option( @@ -664,7 +826,11 @@ def create_sdg_data_fetch_job( name="fetch-sdg-files-from-object-store", image=PYTHON_IMAGE, command=["/bin/sh", "-c"], - args=[SDG_DATA_DOWNLOAD_SCRIPT], + args=[ + SDG_DATA_SCRIPT.format( + strategy="download", SDG_PVC_MOUNT_PATH=SDG_PVC_MOUNT_PATH + ) + ], volume_mounts=get_sdg_vol_mount(), env=[ kubernetes.client.V1EnvVar(