Skip to content

Commit

Permalink
Allow lambda to read from LPDAAC reconciliation reports bucket (#2)
Browse files Browse the repository at this point in the history
* Add permission to read LPDAAC reconciliation bucket
* Remove deployment to dev/uat: LPDAAC does not have dev/uat infrastructure to test against
  • Loading branch information
chuckwondo authored Oct 29, 2024
1 parent 80709c2 commit d8c4ed1
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 52 deletions.
14 changes: 0 additions & 14 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,6 @@ jobs:
run: |
make destroy-it
deploy-dev:
# Deploy to Dev only on push (including merged PR) to `develop` branch
if: github.event_name == 'push' && github.event.ref == 'refs/heads/develop'
needs:
- config
- unit-tests
- integration-tests
uses: ./.github/workflows/deploy.yml
with:
environment: dev
PYTHON_VERSION: "${{ needs.config.outputs.PYTHON_VERSION }}"
TOX_MIN_VERSION: "${{ needs.config.outputs.TOX_MIN_VERSION }}"
secrets: inherit

deploy-prod:
# Deploy to Prod only on publishing a release (tag) on `main` branch
if: github.event_name == 'release' && github.event.action == 'published'
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ install-node: tox
# Install the CDK CLI within the tox virtualenv, if it's not installed or it's the wrong version.
install-cdk: tox install-node
@if [[ ! $$(type cdk 2>/dev/null) =~ $${VIRTUAL_ENV} ]]; then \
set -x; npm install --location global "aws-cdk@v2.*"; \
set -x; npm install --location global "aws-cdk@latest"; \
fi

## venv: Create Python virtual environment in directory `venv`
Expand Down
2 changes: 2 additions & 0 deletions cdk/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

stack_name = os.environ["HLS_LPDAAC_STACK"]
inventory_reports_bucket = os.environ["HLS_LPDAAC_INVENTORY_REPORTS_BUCKET"]
reconciliation_reports_bucket = os.environ["HLS_LPDAAC_RECONCILIATION_REPORTS_BUCKET"]
forward_bucket = os.environ["HLS_LPDAAC_FORWARD_BUCKET"]
historical_bucket = os.environ["HLS_LPDAAC_HISTORICAL_BUCKET"]
request_topic_arn = os.environ["HLS_LPDAAC_REQUEST_TOPIC_ARN"]
Expand All @@ -21,6 +22,7 @@
hls_historical_bucket=historical_bucket,
lpdaac_request_topic_arn=request_topic_arn,
lpdaac_response_topic_arn=response_topic_arn,
lpdaac_reconciliation_reports_bucket=reconciliation_reports_bucket,
managed_policy_name=managed_policy_name,
)

Expand Down
3 changes: 2 additions & 1 deletion cdk/app_it.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
managed_policy_name=managed_policy_name,
)

stack = HlsLpdaacReconciliationStack(
HlsLpdaacReconciliationStack(
app,
f"{stack_name}-lpdaac-reconciliation-it",
hls_inventory_reports_bucket=stack_it.hls_inventory_reports_bucket.bucket_name,
hls_forward_bucket=stack_it.hls_forward_bucket.bucket_name,
hls_historical_bucket=stack_it.hls_historical_bucket.bucket_name,
lpdaac_request_topic_arn=stack_it.lpdaac_request_topic.topic_arn,
lpdaac_response_topic_arn=stack_it.lpdaac_response_topic.topic_arn,
lpdaac_reconciliation_reports_bucket=stack_it.lpdaac_reports_bucket.bucket_name,
managed_policy_name=managed_policy_name,
)

Expand Down
7 changes: 5 additions & 2 deletions cdk/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(
hls_historical_bucket: str,
lpdaac_request_topic_arn: str,
lpdaac_response_topic_arn: str,
lpdaac_reconciliation_reports_bucket: str,
managed_policy_name: Optional[str] = None,
**kwargs,
) -> None:
Expand All @@ -42,7 +43,6 @@ def __init__(
# ----------------------------------------------------------------------

# Bucket where HLS inventory reports are written.
s3.Bucket.from_bucket_attributes
inventory_reports_bucket = s3.Bucket.from_bucket_name(
self, "HlsInventoryReportsBucket", hls_inventory_reports_bucket
)
Expand Down Expand Up @@ -101,10 +101,13 @@ def __init__(
sources.SnsEventSource(lpdaac_response_topic)
)

# Allow lambda function to read/write forward and historical buckets
# Allow lambda function to access buckets
s3.Bucket.from_bucket_name(
self, "HlsForwardBucket", hls_forward_bucket
).grant_read_write(lpdaac_response_lambda)
s3.Bucket.from_bucket_name(
self, "HlsHistoricalBucket", hls_historical_bucket
).grant_read_write(lpdaac_response_lambda)
s3.Bucket.from_bucket_name(
self, "LpdaacReconciliationReports", lpdaac_reconciliation_reports_bucket
).grant_read(lpdaac_response_lambda)
50 changes: 25 additions & 25 deletions cdk/stack_it.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,19 @@ def __init__(
)
)

self.hls_inventory_reports_bucket = s3.Bucket(
self,
"HlsInventoryReports",
auto_delete_objects=True,
removal_policy=RemovalPolicy.DESTROY,
)
self.hls_forward_bucket = s3.Bucket(
self,
"HlsForward",
auto_delete_objects=True,
removal_policy=RemovalPolicy.DESTROY,
)
self.hls_historical_bucket = s3.Bucket(
self,
"HlsHistorical",
auto_delete_objects=True,
removal_policy=RemovalPolicy.DESTROY,
)
self.hls_inventory_reports_bucket = self.make_bucket("HlsInventoryReports")
self.hls_forward_bucket = self.make_bucket("HlsForward")
self.hls_historical_bucket = self.make_bucket("HlsHistorical")
self.lpdaac_reports_bucket = self.make_bucket("LpdaacReconciliationReports")

self.lpdaac_request_topic = sns.Topic(self, "LpdaacRequestTopic")
self.lpdaac_request_queue = sqs.Queue(self, "LpdaacRequestQueue")
self.lpdaac_response_topic = sns.Topic(self, "LpdaacResponseTopic")

# Subscribe a queue to the topic so we can receive messages from the queue
# to confirm that an message was sent to the topic via the lambda handler.
lpdaac_request_queue = sqs.Queue(self, "LpdaacRequestQueue")
self.lpdaac_request_topic.add_subscription(
subs.SqsSubscription(self.lpdaac_request_queue)
subs.SqsSubscription(lpdaac_request_queue)
)

# Set outputs for use within integration tests
Expand All @@ -67,11 +53,6 @@ def __init__(
"HlsInventoryReportsBucketName",
value=self.hls_inventory_reports_bucket.bucket_name,
)
CfnOutput(
self,
"LpdaacRequestQueueUrl",
value=self.lpdaac_request_queue.queue_url,
)
CfnOutput(
self,
"HlsForwardBucketName",
Expand All @@ -82,8 +63,27 @@ def __init__(
"HlsHistoricalBucketName",
value=self.hls_historical_bucket.bucket_name,
)

CfnOutput(
self,
"LpdaacReconciliationReportsBucketName",
value=self.lpdaac_reports_bucket.bucket_name,
)
CfnOutput(
self,
"LpdaacRequestQueueUrl",
value=lpdaac_request_queue.queue_url,
)
CfnOutput(
self,
"LpdaacResponseTopicArn",
value=self.lpdaac_response_topic.topic_arn,
)

def make_bucket(self, construct_id: str) -> s3.Bucket:
return s3.Bucket(
self,
construct_id,
auto_delete_objects=True,
removal_policy=RemovalPolicy.DESTROY,
)
25 changes: 16 additions & 9 deletions tests/integration/test_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,37 @@ def wait_until_modified(
def test_response_handler(
cdk_outputs: Mapping[str, str], s3: S3Client, sns: SNSClient
) -> None:
bucket = cdk_outputs["HlsForwardBucketName"]
hls_bucket = cdk_outputs["HlsForwardBucketName"]
lpdaac_bucket = cdk_outputs["LpdaacReconciliationReportsBucketName"]
topic_arn = cdk_outputs["LpdaacResponseTopicArn"]

# Write trigger file (contents don't matter; we just need a file to "touch")
trigger_key = "S30/data/2124237/HLS.S30.T15XWH.2124237T194859.v2.0/HLS.S30.T15XWH.2124237T194859.v2.0.json"
s3.put_object(Bucket=bucket, Key=trigger_key, Body="{}".encode())
s3.put_object(Bucket=hls_bucket, Key=trigger_key, Body=bytes())
# Read trigger file to get "original" timestamp
trigger_object = s3.head_object(Bucket=bucket, Key=trigger_key)
trigger_object = s3.head_object(Bucket=hls_bucket, Key=trigger_key)
created = trigger_object["LastModified"]

# Write reconciliation report file
# Write reconciliation report file to the LPDAAC bucket
report_fixture = Path("tests") / "fixtures" / "HLS_reconcile_2024239_2.0.json"
report_key = "reports/HLS_reconcile_2024239_2.0.json"
s3.put_object(Bucket=bucket, Key=report_key, Body=report_fixture.read_bytes())
s3.put_object(
Bucket=lpdaac_bucket, Key=report_key, Body=report_fixture.read_bytes()
)

# Send message to topic about the report
# Send message to topic about the report in the LPDAAC bucket, which should
# trigger "response" lambda with the message, causing the lambda to then
# "touch" the trigger file in the HLS bucket.
message_fixture = Path("tests") / "fixtures" / "message-discrepancies.txt"
message = message_fixture.read_text().format(bucket=bucket)
message = message_fixture.read_text().format(bucket=lpdaac_bucket)
sns.publish(Message=message, TopicArn=topic_arn)

# Read trigger file to get "touched" timestamp
touched = wait_until_modified(s3, since=created, bucket=bucket, key=trigger_key)
touched = wait_until_modified(s3, since=created, bucket=hls_bucket, key=trigger_key)

# Count trigger files to make sure no additional trigger files were created
key_count = s3.list_objects_v2(Bucket=bucket, Prefix="S30/", MaxKeys=2)["KeyCount"]
key_count = s3.list_objects_v2(Bucket=hls_bucket, Prefix="S30/", MaxKeys=2)[
"KeyCount"
]

assert touched > created and key_count == 1

0 comments on commit d8c4ed1

Please sign in to comment.