Skip to content

Commit

Permalink
Update template distribution (#4)
Browse files Browse the repository at this point in the history
* Fix package data inclusion

* Fix getting outputs of a erroneously deployed stack

* Remove ResponseMetadata from configuration

* Remove ResponseMetadata from API response

* Update data-capture destination

* Fix contents look up

* Fix typo

* Remove stack outputs cashing

* Add CopyZips function

* Add template to package distribution

* Remove template URL

* Convert traffic_shadowing to package

* Update source bucket

* Rename function parameters

* Update IAM policy

* Update distribution bucket
  • Loading branch information
tidylobster authored Apr 28, 2020
1 parent 7540f56 commit 6e4068e
Show file tree
Hide file tree
Showing 21 changed files with 432 additions and 231 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -247,5 +247,4 @@ $RECYCLE.BIN/
.idea/
.envrc
events/
template_version.txt
test-report.xml
57 changes: 56 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,61 @@ This repository contains implementations of integrations with AWS Sagemaker.
$ pip install hydro-integrations
```

## Before you start

The following rights are required for deploying traffic-shadowing stack.

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"iam:GetRole",
"iam:CreateRole",
"iam:DeleteRole",
"iam:GetRolePolicy",
"iam:PutRolePolicy",
"iam:DeleteRolePolicy",
"iam:PassRole",
"lambda:CreateFunction",
"lambda:DeleteFunction",
"lambda:InvokeFunction",
"lambda:ListVersionsByFunction",
"lambda:GetFunctionConfiguration",
"lambda:PutFunctionConcurrency",
"lambda:AddPermission",
"lambda:RemovePermission"
"lambda:PublishVersion",
"cloudformation:DescribeStacks",
"cloudformation:DescribeStackEvents",
"cloudformation:CreateStack",
"cloudformation:DeleteStack",
],
"Resource": [
"arn:aws:iam:::role/*",
"arn:aws:cloudformation:::stack/*/*",
"arn:aws:lambda:::function:*"
]
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"s3:PutBucketNotification",
"s3:CreateBucket",
"s3:DeleteBucket"
"s3:GetBucketLocation",
"s3:GetBucketNotification",
],
"Resource": "*"
}
]
}
```

## Usage

```python
Expand Down Expand Up @@ -40,7 +95,7 @@ shadowing = TrafficShadowing(
s3_data_training_uri="s3://bucket/data/training",
data_capture_config=data_capture_config,
)
shadowing.deploy_stack()
shadowing.deploy()
```

## How it works
Expand Down
19 changes: 19 additions & 0 deletions aws/traffic_shadowing/tests/stubs/rpc/mocker.py

Large diffs are not rendered by default.

57 changes: 20 additions & 37 deletions hydro_integrations/aws/cloudformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import botocore
from hydro_integrations.aws.helpers import SessionMixin, AWSClientFactory
from hydro_integrations.aws.exceptions import (
StackCanNotBeProcessed, StackIsBeingProcessed, StackNotFound
StackCanNotBeProcessed, StackIsBeingProcessed, StackNotFound, StackOutputsNotFound
)

logger = logging.getLogger(__name__)
Expand All @@ -25,43 +25,31 @@ class CloudFormation(SessionMixin):
"""Base object for interacting with CloudFormation API."""
def __init__(
self,
stack_url: str,
stack_body: str,
stack_name: str,
stack_parameters: List[Dict[str, str]],
stack_capabilities: List[str],
session: boto3.Session,
):
self.stack_url = stack_url
self.stack_body = stack_body
self.stack_name = stack_name
self.stack_parameters = stack_parameters
self.stack_capabilities = stack_capabilities
self._session = session or boto3.Session()
self._cf_client = AWSClientFactory.get_or_create_client('cloudformation', self._session)

self.__stack_outputs = None
self._cf_client = AWSClientFactory \
.get_or_create_client('cloudformation', self._session)

def _wait(self, name):
waiter = self._cf_client.get_waiter(name)
try:
waiter.wait(
StackName=self.stack_name,
)
except botocore.exceptions.WaiterError as error:
logger.error(error)
events = self._describe_stack_events_short()
if events is not None:
events.reverse()
logger.error("Occurred events during stack life.")
logging.error(pprint.pformat(events))
else:
logger.error("Could not find stack events.")
raise StackCanNotBeProcessed from error
waiter.wait(
StackName=self.stack_name,
)

def _create_stack(self):
"""Synchronously create a CloudFormation stack."""
self._cf_client.create_stack(
StackName=self.stack_name,
TemplateURL=self.stack_url,
TemplateBody=self.stack_body,
Parameters=self.stack_parameters,
Capabilities=self.stack_capabilities,
)
Expand All @@ -72,7 +60,7 @@ def _update_stack(self):
try:
self._cf_client.update_stack(
StackName=self.stack_name,
TemplateURL=self.stack_url,
TemplateBody=self.stack_body,
Parameters=self.stack_parameters,
Capabilities=self.stack_capabilities,
)
Expand Down Expand Up @@ -108,7 +96,7 @@ def _describe_stack_events(self) -> Union[dict, None]:
StackName=self.stack_name,
)['StackEvents']
except botocore.exceptions.ClientError:
logger.debug("Could not find stack %s", self.stack_name)
logger.debug("Could not find the stack %s", self.stack_name)

def _describe_stack_events_short(self) -> Union[List[dict], None]:
"""Describe stack events in shortened form."""
Expand Down Expand Up @@ -137,14 +125,10 @@ def _deploy_stack(self, update_stack_if_exists: bool = True):
elif state in STACK_CREATION_FAILED:
logger.error("Current stack is failed to be created. Please, resolve the issue "
"and delete the stack first.")
events = self._describe_stack_events_short()
if events is not None:
events.reverse()
logger.error(pprint.pformat(events))
raise StackCanNotBeProcessed()
raise StackCanNotBeProcessed
elif state in IN_PROGRESS_STATES:
logger.warning("Stack is currently being processed. Please, wait until stack finishes")
raise StackIsBeingProcessed()
raise StackIsBeingProcessed
else:
raise Exception("Reached unexpected state.")

Expand All @@ -156,11 +140,10 @@ def _delete_stack(self):
)
self._wait('stack_delete_complete')

@property
def stack_outputs(self) -> List[dict]:
if self.__stack_outputs is None:
stack = self._describe_stack()
if stack is None:
raise StackNotFound("Could not retrieve outputs of a nonexisting stack.")
self.__stack_outputs = stack['Outputs']
return self.__stack_outputs
def _get_stack_outputs(self) -> List[dict]:
stack = self._describe_stack()
if stack is None:
raise StackNotFound("Could not retrieve outputs of a nonexisting stack.")
elif not stack.get('Outputs'):
raise StackOutputsNotFound("Could not retrieve outputs of a stack.")
return stack['Outputs']
10 changes: 9 additions & 1 deletion hydro_integrations/aws/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# pylint: disable=missing-class-docstring

class StackNotFound(Exception):
class NotFound(Exception):
pass


class StackNotFound(NotFound):
pass


class StackOutputsNotFound(NotFound):
pass


Expand Down
2 changes: 1 addition & 1 deletion hydro_integrations/aws/sagemaker/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from hydro_integrations.aws.sagemaker.traffic_shadowing import TrafficShadowing
from hydro_integrations.aws.sagemaker.traffic_shadowing.traffic_shadowing import TrafficShadowing
6 changes: 4 additions & 2 deletions hydro_integrations/aws/sagemaker/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# pylint: disable=missing-class-docstring
from hydro_integrations.aws.exceptions import NotFound

class DataCaptureConfigException(Exception):

class FunctionNotFound(NotFound):
pass


class FunctionNotFound(Exception):
class DataCaptureConfigException(Exception):
pass
Empty file.
151 changes: 151 additions & 0 deletions hydro_integrations/aws/sagemaker/traffic_shadowing/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: >
CloudFormation template to deploy resources for shadowing traffic from SageMaker
Model Endpoint to Hydrosphere instance.
Parameters:
S3DataCaptureBucketName:
Type: String
S3DataCapturePrefix:
Type: String
S3DataTrainingBucketName:
Type: String
S3DataTrainingPrefix:
Type: String
HydrosphereEndpoint:
Type: String
Resources:
LambdaInvokePermission:
Type: 'AWS::Lambda::Permission'
Properties:
FunctionName: !GetAtt TrafficShadowingFunction.Arn
Action: 'lambda:InvokeFunction'
Principal: s3.amazonaws.com
SourceAccount: !Ref 'AWS::AccountId'
SourceArn: !Sub 'arn:aws:s3:::${S3DataCaptureBucketName}'
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: '/'
Policies:
- PolicyName: traffic-shadowing-policy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:*
Resource: '*'
- Effect: Allow
Action:
- logs:*
Resource: arn:aws:logs:*:*:*
LambdaZipsBucket:
Type: AWS::S3::Bucket
CopyZips:
Type: Custom::CopyZips
Properties:
ServiceToken: !GetAtt CopyZipsFunction.Arn
DestRegion: !Ref AWS::Region
DestBucket: !Ref LambdaZipsBucket
SourceBucket: hydrosphere-integrations-eu-west-3
Prefix: ""
Objects:
- lambda/traffic_shadowing/1a46812f177b62fc77b02818f04c0ff4
CopyZipsFunction:
Type: AWS::Lambda::Function
Properties:
Description: Copies objects from a source S3 bucket to a destination bucket
Handler: index.handler
Runtime: python3.7
Role: !GetAtt LambdaExecutionRole.Arn
Timeout: 240
Code:
ZipFile: |
import json
import logging
import threading
import boto3
import cfnresponse
def copy_objects(source_bucket, dest_bucket, prefix, objects):
s3 = boto3.client('s3')
for o in objects:
key = prefix + o
copy_source = {
'Bucket': source_bucket,
'Key': key
}
s3.copy_object(
CopySource=copy_source,
Bucket=dest_bucket,
Key=key,
RequestPayer='requester',
)
def delete_objects(bucket, prefix, objects):
s3 = boto3.client('s3')
objects = {'Objects': [{'Key': prefix + o} for o in objects]}
s3.delete_objects(Bucket=bucket, Delete=objects)
def timeout(event, context):
logging.error('Execution is about to time out, sending failure response to CloudFormation')
cfnresponse.send(event, context, cfnresponse.FAILED, {}, None)
def handler(event, context):
# make sure we send a failure to CloudFormation if the function is going to timeout
timer = threading.Timer((context.get_remaining_time_in_millis() / 1000.00) - 0.5, timeout, args=[event, context])
timer.start()
print('Received event: %s' % json.dumps(event))
status = cfnresponse.SUCCESS
try:
source_bucket = event['ResourceProperties']['SourceBucket']
dest_bucket = event['ResourceProperties']['DestBucket']
prefix = event['ResourceProperties']['Prefix']
objects = event['ResourceProperties']['Objects']
if event['RequestType'] == 'Delete':
delete_objects(dest_bucket, prefix, objects)
else:
copy_objects(source_bucket, dest_bucket, prefix, objects)
except Exception as e:
logging.error('Exception: %s' % e, exc_info=True)
status = cfnresponse.FAILED
finally:
timer.cancel()
cfnresponse.send(event, context, status, {}, None)
TrafficShadowingFunction:
DependsOn: CopyZips
Type: AWS::Lambda::Function
Properties:
Description: |
Stateless function, triggered by s3:ObjectCreated:* events.
Registers external models, uploads training data, submits
production data for analysis.
Code:
S3Bucket: !Ref LambdaZipsBucket
S3Key: lambda/traffic_shadowing/1a46812f177b62fc77b02818f04c0ff4
Timeout: 240
MemorySize: 256
Runtime: python3.7
Handler: src.handler.lambda_handler
Role: !GetAtt LambdaExecutionRole.Arn
Environment:
Variables:
S3_DATA_CAPTURE_BUCKET: !Ref S3DataCaptureBucketName
S3_DATA_CAPTURE_PREFIX: !Ref S3DataCapturePrefix
S3_DATA_TRAINING_BUCKET: !Ref S3DataTrainingBucketName
S3_DATA_TRAINING_PREFIX: !Ref S3DataTrainingPrefix
HYDROSPHERE_ENDPOINT: !Ref HydrosphereEndpoint
ReservedConcurrentExecutions: 3
TrafficShadowingVersion:
Type: AWS::Lambda::Version
Properties:
FunctionName: !Ref TrafficShadowingFunction
Outputs:
TrafficShadowingFunctionArn:
Value: !GetAtt TrafficShadowingFunction.Arn
Loading

0 comments on commit 6e4068e

Please sign in to comment.