Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactoring code to parquet to zip2parquet #525

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .make.versions
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ DOC_QUALITY_RAY_VERSION=$(DPK_VERSION)
CODE_QUALITY_RAY_VERSION=$(DPK_VERSION)
CODE_QUALITY_PYTHON_VERSION=$(DPK_VERSION)

CODE2PARQUET_PYTHON_VERSION=$(DPK_VERSION)
CODE2PARQUET_RAY_VERSION=$(DPK_VERSION)
ZIP2PARQUET_PYTHON_VERSION=$(DPK_VERSION)
ZIP2PARQUET_RAY_VERSION=$(DPK_VERSION)
INGEST_TO_PARQUET_VERSION=$(DPK_VERSION)
REPO_LVL_ORDER_RAY_VERSION=$(DPK_VERSION)

Expand Down
2 changes: 1 addition & 1 deletion kfp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
|-------------------------------------|:----------------------------------------------------------------------------------:|
| language/lang_id | [lang_id_wf.py](../transforms/language/lang_id/kfp_ray/lang_id_wf.py) |
| code/malware | [malware_wf.py](../transforms/code/malware/kfp_ray/malware_wf.py) |
| code/code2parquet | [code2parquet_wf.py](../transforms/code/code2parquet/kfp_ray/code2parquet_wf.py) |
| code/code2parquet | [code2parquet_wf.py](../transforms/universal/zip2parquet/kfp_ray/zip2parquet_wf.py) |
| code/code_quality | [code_quality_wf.py](../transforms/code/code_quality/kfp_ray/code_quality_wf.py) |
| code/proglang_select | [proglang_select_wf.py](../transforms/code/proglang_select/kfp_ray/proglang_select_wf.py) |
| universal/doc_id | [doc_id_wf.py](../transforms/universal/doc_id/kfp_ray/doc_id_wf.py) |
Expand Down
2 changes: 1 addition & 1 deletion tools/ingest2parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

**Please note: This tool is deprecated and will be removed soon.
It is superseded by the transform-based implementation,
[code2parquet](../../transforms/code/code2parquet), providing identical capability,
[code2parquet](../../transforms/universal/zip2parquet), providing identical capability,
but with support for ray-based scalability.**

## Summary
Expand Down

This file was deleted.

40 changes: 0 additions & 40 deletions transforms/code/code2parquet/ray/test-data/expected/metadata.json

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


## Summary
This project allows execution of the [noop Ray transform](../ray) as a
This project allows execution of the [zip2parquet Ray transform](../ray) as a
[KubeFlow Pipeline](https://www.kubeflow.org/docs/components/pipelines/overview/)

The detail pipeline is presented in the [Simplest Transform pipeline tutorial](../../../../kfp/doc/simple_transform_pipeline.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@


# the name of the job script
EXEC_SCRIPT_NAME: str = "code2parquet_transform_ray.py"
EXEC_SCRIPT_NAME: str = "zip2parquet_transform_ray.py"

task_image = "quay.io/dataprep1/data-prep-kit/code2parquet-ray:latest"
task_image = "quay.io/dataprep1/data-prep-kit/zip2parquet-ray:latest"


# components
Expand All @@ -42,10 +42,12 @@ def compute_exec_params_func(
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: dict,
code2parquet_supported_langs_file: str,
code2parquet_domain: str,
code2parquet_snapshot: str,
code2parquet_detect_programming_lang: bool,
zip2parquet_code_data: bool,
zip2parquet_programming_language_column: str,
zip2parquet_supported_langs_file: str,
zip2parquet_domain: str,
zip2parquet_snapshot: str,
zip2parquet_detect_programming_lang: bool,
) -> dict:
from runtime_utils import KFPUtils

Expand All @@ -59,10 +61,12 @@ def compute_exec_params_func(
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": str(runtime_code_location),
"code2parquet_supported_langs_file": code2parquet_supported_langs_file,
"code2parquet_domain": code2parquet_domain,
"code2parquet_snapshot": code2parquet_snapshot,
"code2parquet_detect_programming_lang": code2parquet_detect_programming_lang,
"zip2parquet_code_data": zip2parquet_code_data,
"zip2parquet_programming_language_column": zip2parquet_programming_language_column,
"zip2parquet_supported_langs_file": zip2parquet_supported_langs_file,
"zip2parquet_domain": zip2parquet_domain,
"zip2parquet_snapshot": zip2parquet_snapshot,
"zip2parquet_detect_programming_lang": zip2parquet_detect_programming_lang,
}


Expand Down Expand Up @@ -97,22 +101,22 @@ def compute_exec_params_func(
# clean up Ray
cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml")
# Task name is part of the pipeline name, the ray cluster name and the job name in DMF.
TASK_NAME: str = "code2parquet"
PREFIX: str = "code2parquet"
TASK_NAME: str = "zip2parquet"
PREFIX: str = "zip2parquet"


@dsl.pipeline(
name=TASK_NAME + "-ray-pipeline",
description="Pipeline for converting zip files to parquet",
)
def code2parquet(
ray_name: str = "code2parquet-kfp-ray", # name of Ray cluster
def zip2parquet(
ray_name: str = "zip2parquet-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': 'test/code2parquet/input', 'output_folder': 'test/code2parquet/output/'}",
data_s3_config: str = "{'input_folder': 'test/zip2parquet/input', 'output_folder': 'test/zip2parquet/output/'}",
data_s3_access_secret: str = "s3-secret",
data_max_files: int = -1,
data_num_samples: int = -1,
Expand All @@ -121,12 +125,14 @@ def code2parquet(
runtime_actor_options: dict = {'num_cpus': 0.8},
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
# code to parquet
code2parquet_supported_langs_file: str = "test/code2parquet/languages/lang_extensions.json",
code2parquet_detect_programming_lang: bool = True,
code2parquet_domain: str = "code",
code2parquet_snapshot: str = "github",
code2parquet_s3_access_secret: str = "s3-secret",
# zip to parquet
zip2parquet_code_data: bool = True,
zip2parquet_programming_language_column: str = "programming_language",
zip2parquet_supported_langs_file: str = "test/zip2parquet/languages/lang_extensions.json",
zip2parquet_detect_programming_lang: bool = True,
zip2parquet_domain: str = "code",
zip2parquet_snapshot: str = "github",
zip2parquet_s3_access_secret: str = "s3-secret",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
) -> None:
Expand Down Expand Up @@ -162,11 +168,13 @@ def code2parquet(
:param runtime_actor_options - actor options
:param runtime_pipeline_id - pipeline id
:param runtime_code_location - code location
:param code2parquet_supported_langs_file - file to store allowed languages
:param code2parquet_detect_programming_lang - detect programming language flag
:param code2parquet_domain: domain
:param code2parquet_snapshot: snapshot
:param code2parquet_s3_access_secret - ingest to parquet s3 access secret
:param zip2parquet_code_data - flag that data is code
:param zip2parquet_programming_language_column - name for programming language column
:param zip2parquet_supported_langs_file - file to store allowed languages
:param zip2parquet_detect_programming_lang - detect programming language flag
:param zip2parquet_domain: domain
:param zip2parquet_snapshot: snapshot
:param zip2parquet_s3_access_secret - ingest to parquet s3 access secret
(here we are assuming that select language info is in S3, but potentially in the different bucket)
:return: None
"""
Expand All @@ -186,10 +194,12 @@ def code2parquet(
runtime_pipeline_id=runtime_pipeline_id,
runtime_job_id=run_id,
runtime_code_location=runtime_code_location,
code2parquet_supported_langs_file=code2parquet_supported_langs_file,
code2parquet_domain=code2parquet_domain,
code2parquet_snapshot=code2parquet_snapshot,
code2parquet_detect_programming_lang=code2parquet_detect_programming_lang,
zip2parquet_code_data=zip2parquet_code_data,
zip2parquet_programming_language_column=zip2parquet_programming_language_column,
zip2parquet_supported_langs_file=zip2parquet_supported_langs_file,
zip2parquet_domain=zip2parquet_domain,
zip2parquet_snapshot=zip2parquet_snapshot,
zip2parquet_detect_programming_lang=zip2parquet_detect_programming_lang,
)
ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
Expand All @@ -216,10 +226,10 @@ def code2parquet(
)
ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC)
ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret)
ComponentUtils.set_s3_env_vars_to_component(execute_job, code2parquet_s3_access_secret, prefix=PREFIX)
ComponentUtils.set_s3_env_vars_to_component(execute_job, zip2parquet_s3_access_secret, prefix=PREFIX)
execute_job.after(ray_cluster)


if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(code2parquet, __file__.replace(".py", ".yaml"))
compiler.Compiler().compile(zip2parquet, __file__.replace(".py", ".yaml"))
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
test-data/output
output/*
output/metadata.json
/output/
data-processing-lib/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ RUN cd data-processing-lib-python && pip install --no-cache-dir -e .

# END OF STEPS destined for a data-prep-kit base image

COPY --chown=dpk:root src/ src/
COPY --chown=dpk:root src src/
COPY --chown=dpk:root pyproject.toml pyproject.toml
RUN pip install --no-cache-dir -e .

# copy the main() entry point to the image
COPY ./src/code2parquet_transform_python.py .
COPY src/zip2parquet_transform_python.py .

# copy some of the samples in
COPY ./src/code2parquet_local.py local/
COPY src/zip2parquet_local.py local/

# copy test
COPY test/ test/
COPY test-data/ test-data/
COPY test test/
COPY test-data test-data/

# Set environment
ENV PYTHONPATH /home/dpk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ REPOROOT=../../../..

# $(REPOROOT)/.make.versions file contains the versions

TRANSFORM_NAME=code2parquet
TRANSFORM_NAME=zip2parquet

include $(REPOROOT)/transforms/.make.transforms

Expand All @@ -33,7 +33,7 @@ setup:: .transforms.setup

# distribution versions is the same as image version.
set-versions:
$(MAKE) TRANSFORM_PYTHON_VERSION=$(CODE2PARQUET_PYTHON_VERSION) TOML_VERSION=$(CODE2PARQUET_PYTHON_VERSION) .transforms.set-versions
$(MAKE) TRANSFORM_PYTHON_VERSION=$(ZIP2PARQUET_PYTHON_VERSION) TOML_VERSION=$(ZIP2PARQUET_PYTHON_VERSION) .transforms.set-versions

build-dist:: .defaults.build-dist

Expand All @@ -46,8 +46,8 @@ run-cli-sample:
RUN_ARGS=" \
--data_local_config \" { 'input_folder' : '../test-data/input', 'output_folder' : '../output' } \" \
--data_files_to_use \"['.zip']\" \
--code2parquet_supported_langs_file ../test-data/languages/lang_extensions.json \
--code2parquet_detect_programming_lang True " \
--zip2parquet_supported_langs_file ../test-data/languages/lang_extensions.json \
--zip2parquet_detect_programming_lang True " \
.transforms.run-src-file

run-local-sample: .transforms.run-local-sample
Expand Down
Loading