Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding functionality to return export ARN for point in time export in DynamoDBToS3Operator #45025

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,15 @@ def execute(self, context: Context) -> None:
# function. This change introduces a new boolean, as the indicator for whether the operator scans
# and export entire data or using the point in time functionality.
if self.point_in_time_export or self.export_time:
self._export_table_to_point_in_time()
return self._export_table_to_point_in_time()
else:
self._export_entire_data()
return self._export_entire_data()
Comment on lines -169 to +171
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if adding return statement then there is no need for else


def _export_table_to_point_in_time(self):
"""
Export data to point in time.

Full export exports data from start of epoc till `export_time`.
Full export exports data from start of epoc till `export_time` and returns the export ARN.
Table export will be a snapshot of the table's state at this point in time.

Incremental export exports the data from a specific datetime to a specific datetime
Expand Down Expand Up @@ -210,6 +210,8 @@ def _export_table_to_point_in_time(self):
WaiterConfig={"Delay": self.check_interval, "MaxAttempts": self.max_attempts},
)

return export_arn

def _export_entire_data(self):
"""Export all data from the table."""
table = self.hook.conn.Table(self.dynamodb_table_name)
Expand All @@ -225,6 +227,7 @@ def _export_entire_data(self):
finally:
if err is None:
_upload_file_to_s3(f, self.s3_bucket_name, self.s3_key_prefix, self.dest_aws_conn_id)
return None

def _scan_dynamodb_and_upload_to_s3(self, temp_file: IO, scan_kwargs: dict, table: Any) -> IO:
while True:
Expand Down
Loading