Skip to content

Commit

Permalink
refactoring code to parquet to zip2parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Aug 22, 2024
1 parent 82a4758 commit 22e7636
Show file tree
Hide file tree
Showing 49 changed files with 276 additions and 197 deletions.
4 changes: 2 additions & 2 deletions .make.versions
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ DOC_QUALITY_RAY_VERSION=$(DPK_VERSION)
CODE_QUALITY_RAY_VERSION=$(DPK_VERSION)
CODE_QUALITY_PYTHON_VERSION=$(DPK_VERSION)

CODE2PARQUET_PYTHON_VERSION=$(DPK_VERSION)
CODE2PARQUET_RAY_VERSION=$(DPK_VERSION)
ZIP2PARQUET_PYTHON_VERSION=$(DPK_VERSION)
ZIP2PARQUET_RAY_VERSION=$(DPK_VERSION)
INGEST_TO_PARQUET_VERSION=$(DPK_VERSION)
REPO_LVL_ORDER_RAY_VERSION=$(DPK_VERSION)

Expand Down
2 changes: 1 addition & 1 deletion kfp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
|-------------------------------------|:----------------------------------------------------------------------------------:|
| language/lang_id | [lang_id_wf.py](../transforms/language/lang_id/kfp_ray/lang_id_wf.py) |
| code/malware | [malware_wf.py](../transforms/code/malware/kfp_ray/malware_wf.py) |
| code/code2parquet | [code2parquet_wf.py](../transforms/code/code2parquet/kfp_ray/code2parquet_wf.py) |
| code/code2parquet | [code2parquet_wf.py](../transforms/universal/zip2parquet/kfp_ray/zip2parquet_wf.py) |
| code/code_quality | [code_quality_wf.py](../transforms/code/code_quality/kfp_ray/code_quality_wf.py) |
| code/proglang_select | [proglang_select_wf.py](../transforms/code/proglang_select/kfp_ray/proglang_select_wf.py) |
| universal/doc_id | [doc_id_wf.py](../transforms/universal/doc_id/kfp_ray/doc_id_wf.py) |
Expand Down
2 changes: 1 addition & 1 deletion tools/ingest2parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

**Please note: This tool is deprecated and will be removed soon.
It is superseded by the transform-based implementation,
[code2parquet](../../transforms/code/code2parquet), providing identical capability,
[code2parquet](../../transforms/universal/zip2parquet), providing identical capability,
but with support for ray-based scalability.**

## Summary
Expand Down

This file was deleted.

40 changes: 0 additions & 40 deletions transforms/code/code2parquet/ray/test-data/expected/metadata.json

This file was deleted.

File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


## Summary
This project allows execution of the [noop Ray transform](../ray) as a
This project allows execution of the [zip2parquet Ray transform](../ray) as a
[KubeFlow Pipeline](https://www.kubeflow.org/docs/components/pipelines/overview/)

The detail pipeline is presented in the [Simplest Transform pipeline tutorial](../../../../kfp/doc/simple_transform_pipeline.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@


# the name of the job script
EXEC_SCRIPT_NAME: str = "code2parquet_transform_ray.py"
EXEC_SCRIPT_NAME: str = "zip2parquet_transform_ray.py"

task_image = "quay.io/dataprep1/data-prep-kit/code2parquet-ray:latest"
task_image = "quay.io/dataprep1/data-prep-kit/zip2parquet-ray:latest"


# components
Expand All @@ -42,10 +42,12 @@ def compute_exec_params_func(
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: dict,
code2parquet_supported_langs_file: str,
code2parquet_domain: str,
code2parquet_snapshot: str,
code2parquet_detect_programming_lang: bool,
zip2parquet_code_data: bool,
zip2parquet_programming_language_column: str,
zip2parquet_supported_langs_file: str,
zip2parquet_domain: str,
zip2parquet_snapshot: str,
zip2parquet_detect_programming_lang: bool,
) -> dict:
from runtime_utils import KFPUtils

Expand All @@ -59,10 +61,12 @@ def compute_exec_params_func(
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": str(runtime_code_location),
"code2parquet_supported_langs_file": code2parquet_supported_langs_file,
"code2parquet_domain": code2parquet_domain,
"code2parquet_snapshot": code2parquet_snapshot,
"code2parquet_detect_programming_lang": code2parquet_detect_programming_lang,
"zip2parquet_code_data": zip2parquet_code_data,
"zip2parquet_programming_language_column": zip2parquet_programming_language_column,
"zip2parquet_supported_langs_file": zip2parquet_supported_langs_file,
"zip2parquet_domain": zip2parquet_domain,
"zip2parquet_snapshot": zip2parquet_snapshot,
"zip2parquet_detect_programming_lang": zip2parquet_detect_programming_lang,
}


Expand Down Expand Up @@ -97,22 +101,22 @@ def compute_exec_params_func(
# clean up Ray
cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml")
# Task name is part of the pipeline name, the ray cluster name and the job name in DMF.
TASK_NAME: str = "code2parquet"
PREFIX: str = "code2parquet"
TASK_NAME: str = "zip2parquet"
PREFIX: str = "zip2parquet"


@dsl.pipeline(
name=TASK_NAME + "-ray-pipeline",
description="Pipeline for converting zip files to parquet",
)
def code2parquet(
ray_name: str = "code2parquet-kfp-ray", # name of Ray cluster
def zip2parquet(
ray_name: str = "zip2parquet-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': 'test/code2parquet/input', 'output_folder': 'test/code2parquet/output/'}",
data_s3_config: str = "{'input_folder': 'test/zip2parquet/input', 'output_folder': 'test/zip2parquet/output/'}",
data_s3_access_secret: str = "s3-secret",
data_max_files: int = -1,
data_num_samples: int = -1,
Expand All @@ -121,12 +125,14 @@ def code2parquet(
runtime_actor_options: dict = {'num_cpus': 0.8},
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
# code to parquet
code2parquet_supported_langs_file: str = "test/code2parquet/languages/lang_extensions.json",
code2parquet_detect_programming_lang: bool = True,
code2parquet_domain: str = "code",
code2parquet_snapshot: str = "github",
code2parquet_s3_access_secret: str = "s3-secret",
# zip to parquet
zip2parquet_code_data: bool = True,
zip2parquet_programming_language_column: str = "programming_language",
zip2parquet_supported_langs_file: str = "test/zip2parquet/languages/lang_extensions.json",
zip2parquet_detect_programming_lang: bool = True,
zip2parquet_domain: str = "code",
zip2parquet_snapshot: str = "github",
zip2parquet_s3_access_secret: str = "s3-secret",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
) -> None:
Expand Down Expand Up @@ -162,11 +168,13 @@ def code2parquet(
:param runtime_actor_options - actor options
:param runtime_pipeline_id - pipeline id
:param runtime_code_location - code location
:param code2parquet_supported_langs_file - file to store allowed languages
:param code2parquet_detect_programming_lang - detect programming language flag
:param code2parquet_domain: domain
:param code2parquet_snapshot: snapshot
:param code2parquet_s3_access_secret - ingest to parquet s3 access secret
:param zip2parquet_code_data - flag that data is code
:param zip2parquet_programming_language_column - name for programming language column
:param zip2parquet_supported_langs_file - file to store allowed languages
:param zip2parquet_detect_programming_lang - detect programming language flag
:param zip2parquet_domain: domain
:param zip2parquet_snapshot: snapshot
:param zip2parquet_s3_access_secret - ingest to parquet s3 access secret
(here we are assuming that select language info is in S3, but potentially in the different bucket)
:return: None
"""
Expand All @@ -186,10 +194,12 @@ def code2parquet(
runtime_pipeline_id=runtime_pipeline_id,
runtime_job_id=run_id,
runtime_code_location=runtime_code_location,
code2parquet_supported_langs_file=code2parquet_supported_langs_file,
code2parquet_domain=code2parquet_domain,
code2parquet_snapshot=code2parquet_snapshot,
code2parquet_detect_programming_lang=code2parquet_detect_programming_lang,
zip2parquet_code_data=zip2parquet_code_data,
zip2parquet_programming_language_column=zip2parquet_programming_language_column,
zip2parquet_supported_langs_file=zip2parquet_supported_langs_file,
zip2parquet_domain=zip2parquet_domain,
zip2parquet_snapshot=zip2parquet_snapshot,
zip2parquet_detect_programming_lang=zip2parquet_detect_programming_lang,
)
ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
Expand All @@ -216,10 +226,10 @@ def code2parquet(
)
ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC)
ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret)
ComponentUtils.set_s3_env_vars_to_component(execute_job, code2parquet_s3_access_secret, prefix=PREFIX)
ComponentUtils.set_s3_env_vars_to_component(execute_job, zip2parquet_s3_access_secret, prefix=PREFIX)
execute_job.after(ray_cluster)


if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(code2parquet, __file__.replace(".py", ".yaml"))
compiler.Compiler().compile(zip2parquet, __file__.replace(".py", ".yaml"))
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
test-data/output
output/*
output/metadata.json
/output/
data-processing-lib/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ RUN cd data-processing-lib-python && pip install --no-cache-dir -e .

# END OF STEPS destined for a data-prep-kit base image

COPY --chown=dpk:root src/ src/
COPY --chown=dpk:root src src/
COPY --chown=dpk:root pyproject.toml pyproject.toml
RUN pip install --no-cache-dir -e .

# copy the main() entry point to the image
COPY ./src/code2parquet_transform_python.py .
COPY src/zip2parquet_transform_python.py .

# copy some of the samples in
COPY ./src/code2parquet_local.py local/
COPY src/zip2parquet_local.py local/

# copy test
COPY test/ test/
COPY test-data/ test-data/
COPY test test/
COPY test-data test-data/

# Set environment
ENV PYTHONPATH /home/dpk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ REPOROOT=../../../..

# $(REPOROOT)/.make.versions file contains the versions

TRANSFORM_NAME=code2parquet
TRANSFORM_NAME=zip2parquet

include $(REPOROOT)/transforms/.make.transforms

Expand All @@ -33,7 +33,7 @@ setup:: .transforms.setup

# distribution versions is the same as image version.
set-versions:
$(MAKE) TRANSFORM_PYTHON_VERSION=$(CODE2PARQUET_PYTHON_VERSION) TOML_VERSION=$(CODE2PARQUET_PYTHON_VERSION) .transforms.set-versions
$(MAKE) TRANSFORM_PYTHON_VERSION=$(ZIP2PARQUET_PYTHON_VERSION) TOML_VERSION=$(ZIP2PARQUET_PYTHON_VERSION) .transforms.set-versions

build-dist:: .defaults.build-dist

Expand All @@ -46,8 +46,8 @@ run-cli-sample:
RUN_ARGS=" \
--data_local_config \" { 'input_folder' : '../test-data/input', 'output_folder' : '../output' } \" \
--data_files_to_use \"['.zip']\" \
--code2parquet_supported_langs_file ../test-data/languages/lang_extensions.json \
--code2parquet_detect_programming_lang True " \
--zip2parquet_supported_langs_file ../test-data/languages/lang_extensions.json \
--zip2parquet_detect_programming_lang True " \
.transforms.run-src-file

run-local-sample: .transforms.run-local-sample
Expand Down
Loading

0 comments on commit 22e7636

Please sign in to comment.