From b073c58f0aa92f3655bd1d1a0b6f50c210a9800d Mon Sep 17 00:00:00 2001 From: Nathan Zimmerman Date: Mon, 11 Sep 2023 15:55:02 -0500 Subject: [PATCH 1/2] Fix task deployment, clean up code, enable transfer to non-standard bucket names --- .../groups/transfer_group.py | 2 +- docker_tasks/cogify_transfer/handler.py | 90 ++++++++++++------- docker_tasks/cogify_transfer/requirements.txt | 2 +- infrastructure/task_definition.tf | 29 ++++++ 4 files changed, 90 insertions(+), 33 deletions(-) diff --git a/dags/veda_data_pipeline/groups/transfer_group.py b/dags/veda_data_pipeline/groups/transfer_group.py index 62065d7e..1547e5f9 100644 --- a/dags/veda_data_pipeline/groups/transfer_group.py +++ b/dags/veda_data_pipeline/groups/transfer_group.py @@ -56,7 +56,7 @@ def subdag_transfer(): overrides={ "containerOverrides": [ { - "name": f"{mwaa_stack_conf.get('PREFIX')}-veda-cogify-transfer", + "name": f"{mwaa_stack_conf.get('PREFIX')}-veda-cogify_transfer", "command": [ "/usr/local/bin/python", "handler.py", diff --git a/docker_tasks/cogify_transfer/handler.py b/docker_tasks/cogify_transfer/handler.py index 1f233658..de90b96e 100644 --- a/docker_tasks/cogify_transfer/handler.py +++ b/docker_tasks/cogify_transfer/handler.py @@ -1,9 +1,14 @@ +import ast +import json import os import re import tempfile +from argparse import ArgumentParser +from time import sleep, time import boto3 from rio_cogeo.cogeo import cog_translate +from rio_cogeo.profiles import cog_profiles def assume_role(role_arn, session_name="veda-airflow-pipelines_transfer_files"): @@ -12,12 +17,7 @@ def assume_role(role_arn, session_name="veda-airflow-pipelines_transfer_files"): RoleArn=role_arn, RoleSessionName=session_name, ) - creds = credentials["Credentials"] - return { - "aws_access_key_id": creds["AccessKeyId"], - "aws_secret_access_key": creds.get("SecretAccessKey"), - "aws_session_token": creds.get("SessionToken"), - } + return credentials["Credentials"] def get_matching_files(s3_client, bucket, prefix, regex_pattern): @@ -42,44 +42,72 @@ def get_matching_files(s3_client, bucket, prefix, regex_pattern): return matching_files -def transfer_file(s3_client, file_key, local_file_path, destination_bucket, collection): - filename = file_key.split("/")[-1] - target_key = f"{collection}/{filename}" - s3_client.upload_file(local_file_path, destination_bucket, target_key) - - -def cogify_transfer_handler(event, context): - external_role_arn = os.environ["EXTERNAL_ROLE_ARN"] - creds = assume_role(external_role_arn, "veda-data-pipelines_data-transfer") - kwargs = { - "aws_access_key_id": creds["AccessKeyId"], - "aws_secret_access_key": creds["SecretAccessKey"], - "aws_session_token": creds["SessionToken"], - } - source_s3 = boto3.client("s3") - target_s3 = boto3.client("s3", **kwargs) - +def cogify_transfer_handler(event): origin_bucket = event.get("origin_bucket") origin_prefix = event.get("origin_prefix") regex_pattern = event.get("filename_regex") target_bucket = event.get("target_bucket", "veda-data-store-staging") collection = event.get("collection") + cog_profile = event.get("cog_profile", "deflate") + dry_run = event.get("dry_run", False) + + source_s3 = boto3.client("s3") + if target_bucket == "veda-data-store-staging": + external_role_arn = os.environ["EXTERNAL_ROLE_ARN"] + creds = assume_role(external_role_arn, "veda-data-pipelines_data-transfer") + kwargs = { + "aws_access_key_id": creds["AccessKeyId"], + "aws_secret_access_key": creds["SecretAccessKey"], + "aws_session_token": creds["SessionToken"], + } + target_s3 = boto3.client("s3", **kwargs) + else: + target_s3 = boto3.client("s3") + + dst_profile = cog_profiles.get(cog_profile) matching_files = get_matching_files( source_s3, origin_bucket, origin_prefix, regex_pattern ) - if not event.get("dry_run"): + if not dry_run: for origin_key in matching_files: - with tempfile.NamedTemporaryFile() as local_tif, tempfile.NamedTemporaryFile() as local_cog: + with tempfile.NamedTemporaryFile(delete=False) as local_tif, tempfile.NamedTemporaryFile(delete=False) as local_cog: local_tif_path = local_tif.name local_cog_path = local_cog.name source_s3.download_file(origin_bucket, origin_key, local_tif_path) - cog_translate(local_tif_path, local_cog_path, quiet=True) + local_tif.close() + cog_translate(local_tif_path, local_cog_path, dst_profile, quiet=True) + local_cog.close() filename = origin_key.split("/")[-1] destination_key = f"{collection}/{filename}" target_s3.upload_file(local_cog_path, target_bucket, destination_key) - else: - print( - f"Would have copied {len(matching_files)} files from {origin_bucket} to {target_bucket}" - ) - print(f"Files matched: {matching_files}") + + # Manually delete the temporary files + os.remove(local_tif_path) + os.remove(local_cog_path) + + return {"matching_files": matching_files, "dry_run": dry_run} + + +if __name__ == "__main__": + parser = ArgumentParser( + prog="cogify_transfer", + description="Cogify and transfer files on S3", + ) + parser.add_argument( + "--payload", dest="payload", help="event passed to stac_handler function" + ) + args = parser.parse_args() + # For cloud watch log to work the task should stay alive for at least 30 s + start = time() + print(f"Start at {start}") + + payload_event = ast.literal_eval(args.payload) + cogify_transfer_response = cogify_transfer_handler(payload_event) + response = json.dumps({**payload_event, **cogify_transfer_response}) + end = time() - start + print(f"Actual processing took {end:.2f} seconds") + # Check if it took less than 50 seconds + if end - start < 50: + sleep(50) + print(response) diff --git a/docker_tasks/cogify_transfer/requirements.txt b/docker_tasks/cogify_transfer/requirements.txt index 1a9fc916..56e091b1 100644 --- a/docker_tasks/cogify_transfer/requirements.txt +++ b/docker_tasks/cogify_transfer/requirements.txt @@ -3,7 +3,7 @@ awslambdaric boto3 pystac==1.4.0 python-cmr -rasterio==1.3.0 +rasterio==1.3.3 rio-cogeo==4.0.0 shapely smart-open==6.3.0 diff --git a/infrastructure/task_definition.tf b/infrastructure/task_definition.tf index e63e679f..a9f797da 100644 --- a/infrastructure/task_definition.tf +++ b/infrastructure/task_definition.tf @@ -56,3 +56,32 @@ resource "aws_ecs_task_definition" "veda_vector_task_definition" { cpu = 2048 memory = 4096 } + +resource "aws_ecs_task_definition" "veda_transfer_task_definition" { + + + container_definitions = jsonencode([ + + { + name = "${var.prefix}-veda-cogify_transfer" + image = "${local.account_id}.dkr.ecr.${local.aws_region}.amazonaws.com/${var.prefix}-veda-cogify_transfer" + essential = true, + logConfiguration = { + "logDriver" : "awslogs", + "options" : { + "awslogs-group" : module.mwaa.log_group_name, + "awslogs-region" : local.aws_region, + "awslogs-stream-prefix" : "ecs" + } + } + } + + ]) + family = "${var.prefix}-tasks" + requires_compatibilities = ["FARGATE"] + network_mode = "awsvpc" + execution_role_arn = module.mwaa.mwaa_role_arn + task_role_arn = module.mwaa.mwaa_role_arn + cpu = 2048 + memory = 4096 +} From 56eb1243fc3db0d2d8059489501d789d69204a0a Mon Sep 17 00:00:00 2001 From: Nathan Zimmerman Date: Mon, 11 Sep 2023 15:55:22 -0500 Subject: [PATCH 2/2] Lint --- docker_tasks/cogify_transfer/handler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docker_tasks/cogify_transfer/handler.py b/docker_tasks/cogify_transfer/handler.py index de90b96e..09d8a623 100644 --- a/docker_tasks/cogify_transfer/handler.py +++ b/docker_tasks/cogify_transfer/handler.py @@ -71,7 +71,9 @@ def cogify_transfer_handler(event): ) if not dry_run: for origin_key in matching_files: - with tempfile.NamedTemporaryFile(delete=False) as local_tif, tempfile.NamedTemporaryFile(delete=False) as local_cog: + with tempfile.NamedTemporaryFile( + delete=False + ) as local_tif, tempfile.NamedTemporaryFile(delete=False) as local_cog: local_tif_path = local_tif.name local_cog_path = local_cog.name source_s3.download_file(origin_bucket, origin_key, local_tif_path)