Skip to content

Commit

Permalink
Add DMS Serverless Operators (#43988)
Browse files Browse the repository at this point in the history
* Adding DMS serverless operators

---------

Co-authored-by: Niko Oliveira <[email protected]>
Co-authored-by: mse139 <[email protected]>
Co-authored-by: Elad Kalif <[email protected]>
  • Loading branch information
4 people authored Dec 17, 2024
1 parent 98d9c38 commit ce4236f
Show file tree
Hide file tree
Showing 11 changed files with 2,785 additions and 3 deletions.
86 changes: 86 additions & 0 deletions docs/apache-airflow-providers-amazon/operators/dms.rst
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,92 @@ To delete a replication task you can use
:start-after: [START howto_operator_dms_delete_task]
:end-before: [END howto_operator_dms_delete_task]

.. _howto/operator:DmsCreateReplicationConfigOperator:

Create a serverless replication config
======================================

To create a serverless replication config use
:class:`~airflow.providers.amazon.aws.operators.dms.DmsCreateReplicationConfigOperator`.

.. exampleinclude:: /../../providers/tests/system/amazon/aws/example_dms_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dms_create_replication_config]
:end-before: [END howto_operator_dms_create_replication_config]

.. _howto/operator:DmsDescribeReplicationConfigsOperator:

Describe a serverless replication config
========================================

To describe a serverless replication config use
:class:`~airflow.providers.amazon.aws.operators.dms.DmsDescribeReplicationConfigsOperator`.

.. exampleinclude:: /../../providers/tests/system/amazon/aws/example_dms_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dms_describe_replication_config]
:end-before: [END howto_operator_dms_describe_replication_config]

.. _howto/operator:DmsStartReplicationOperator:

Start a serverless replication
==============================

To start a serverless replication use
:class:`~airflow.providers.amazon.aws.operators.dms.DmsStartReplicationOperator`.

.. exampleinclude:: /../../providers/tests/system/amazon/aws/example_dms_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dms_serverless_start_replication]
:end-before: [END howto_operator_dms_serverless_start_replication]

.. _howto/operator:DmsStopReplicationOperator:

Stop a serverless replication
==============================

To stop a serverless replication use
:class:`~airflow.providers.amazon.aws.operators.dms.DmsStopReplicationOperator`.

.. exampleinclude:: /../../providers/tests/system/amazon/aws/example_dms_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dms_serverless_stop_replication]
:end-before: [END howto_operator_dms_serverless_stop_replication]

.. _howto/operator:DmsDescribeReplicationsOperator:

Get the status of a serverless replication
==========================================

To get the status of a serverless replication use
:class:`~airflow.providers.amazon.aws.operators.dms.DmsDescribeReplicationsOperator`.

.. exampleinclude:: /../../providers/tests/system/amazon/aws/example_dms_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dms_serverless_describe_replication]
:end-before: [END howto_operator_dms_serverless_describe_replication]

.. _howto/operator:DmsDeleteReplicationConfigOperator:

Delete a serverless replication configuration
=============================================

To delete a serverless replication config use
:class:`~airflow.providers.amazon.aws.operators.dms.DmsDeleteReplicationConfigOperator`.

.. exampleinclude:: /../../providers/tests/system/amazon/aws/example_dms_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dms_serverless_delete_replication_config]
:end-before: [END howto_operator_dms_serverless_delete_replication_config]



Sensors
-------

Expand Down
161 changes: 161 additions & 0 deletions providers/src/airflow/providers/amazon/aws/hooks/dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
from __future__ import annotations

import json
from datetime import datetime
from enum import Enum
from typing import Any

from botocore.exceptions import ClientError
from dateutil import parser

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook

Expand Down Expand Up @@ -219,3 +224,159 @@ def wait_for_task_status(self, replication_task_arn: str, status: DmsTaskWaiterS
],
WithoutSettings=True,
)

def describe_replication_configs(self, filters: list[dict] | None = None, **kwargs) -> list[dict]:
"""
Return list of serverless replication configs.
.. seealso::
- :external+boto3:py:meth:`DatabaseMigrationService.Client.describe_replication_configs`
:param filters: List of filter objects
:return: List of replication tasks
"""
filters = filters if filters is not None else []

try:
resp = self.conn.describe_replication_configs(Filters=filters, **kwargs)
return resp.get("ReplicationConfigs", [])
except Exception as ex:
self.log.error("Error while describing replication configs: %s", str(ex))
raise ex

def create_replication_config(
self,
replication_config_id: str,
source_endpoint_arn: str,
target_endpoint_arn: str,
compute_config: dict[str, Any],
replication_type: str,
table_mappings: str,
additional_config_kwargs: dict[str, Any] | None = None,
**kwargs,
):
"""
Create an AWS DMS Serverless configuration that can be used to start an DMS Serverless replication.
.. seealso::
- :external+boto3:py:meth:`DatabaseMigrationService.Client.create_replication_config`
:param replicationConfigId: Unique identifier used to create a ReplicationConfigArn.
:param sourceEndpointArn: ARN of the source endpoint
:param targetEndpointArn: ARN of the target endpoint
:param computeConfig: Parameters for provisioning an DMS Serverless replication.
:param replicationType: type of DMS Serverless replication
:param tableMappings: JSON table mappings
:param tags: Key-value tag pairs
:param resourceId: Unique value or name that you set for a given resource that can be used to construct an Amazon Resource Name (ARN) for that resource.
:param supplementalSettings: JSON settings for specifying supplemental data
:param replicationSettings: JSON settings for DMS Serverless replications
:return: ReplicationConfigArn
"""
if additional_config_kwargs is None:
additional_config_kwargs = {}
try:
resp = self.conn.create_replication_config(
ReplicationConfigIdentifier=replication_config_id,
SourceEndpointArn=source_endpoint_arn,
TargetEndpointArn=target_endpoint_arn,
ComputeConfig=compute_config,
ReplicationType=replication_type,
TableMappings=table_mappings,
**additional_config_kwargs,
)
arn = resp.get("ReplicationConfig", {}).get("ReplicationConfigArn")
self.log.info("Successfully created replication config: %s", arn)
return arn

except ClientError as err:
err_str = f"Error: {err.get('Error','').get('Code','')}: {err.get('Error','').get('Message','')}"
self.log.error("Error while creating replication config: %s", err_str)
raise err

def describe_replications(self, filters: list[dict[str, Any]] | None = None, **kwargs) -> list[dict]:
"""
Return list of serverless replications.
.. seealso::
- :external+boto3:py:meth:`DatabaseMigrationService.Client.describe_replications`
:param filters: List of filter objects
:return: List of replications
"""
filters = filters if filters is not None else []
try:
resp = self.conn.describe_replications(Filters=filters, **kwargs)
return resp.get("Replications", [])
except Exception as ex:
self.log.error("Error while describing replications: %s", str(ex))
raise ex

def delete_replication_config(
self, replication_config_arn: str, delay: int = 60, max_attempts: int = 120
):
"""
Delete an AWS DMS Serverless configuration.
.. seealso::
- :external+boto3:py:meth:`DatabaseMigrationService.Client.delete_replication_config`
:param replication_config_arn: ReplicationConfigArn
"""
try:
self.log.info("Deleting replication config: %s", replication_config_arn)

self.conn.delete_replication_config(ReplicationConfigArn=replication_config_arn)

except ClientError as err:
err_str = (
f"Error: {err.get('Error', '').get('Code', '')}: {err.get('Error', '').get('Message', '')}"
)
self.log.error("Error while deleting replication config: %s", err_str)
raise err

def start_replication(
self,
replication_config_arn: str,
start_replication_type: str,
cdc_start_time: datetime | str | None = None,
cdc_start_pos: str | None = None,
cdc_stop_pos: str | None = None,
):
additional_args: dict[str, Any] = {}

if cdc_start_time:
additional_args["CdcStartTime"] = (
cdc_start_time if isinstance(cdc_start_time, datetime) else parser.parse(cdc_start_time)
)
if cdc_start_pos:
additional_args["CdcStartPosition"] = cdc_start_pos
if cdc_stop_pos:
additional_args["CdcStopPosition"] = cdc_stop_pos

try:
resp = self.conn.start_replication(
ReplicationConfigArn=replication_config_arn,
StartReplicationType=start_replication_type,
**additional_args,
)

return resp
except Exception as ex:
self.log.error("Error while starting replication: %s", str(ex))
raise ex

def stop_replication(self, replication_config_arn: str):
resp = self.conn.stop_replication(ReplicationConfigArn=replication_config_arn)
return resp

def get_provision_status(self, replication_config_arn: str) -> str:
"""Get the provisioning status for a serverless replication."""
result = self.describe_replications(
filters=[{"Name": "replication-config-arn", "Values": [replication_config_arn]}]
)

provision_status = result[0].get("ProvisionData", {}).get("ProvisionState", "")
return provision_status
Loading

0 comments on commit ce4236f

Please sign in to comment.