Skip to content

Commit 85155d2

Browse files
Lee-Wjoaopamaral
authored andcommitted
fix(providers/amazon): handle ClientError raised after key is missing during table.get_item (apache#42408)
1 parent a045756 commit 85155d2

File tree

2 files changed

+39
-8
lines changed

2 files changed

+39
-8
lines changed

airflow/providers/amazon/aws/sensors/dynamodb.py

+22-8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
from typing import TYPE_CHECKING, Any, Iterable, Sequence
2020

21+
from botocore.exceptions import ClientError
22+
2123
from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
2224
from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
2325
from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
@@ -102,14 +104,26 @@ def poke(self, context: Context) -> bool:
102104
table = self.hook.conn.Table(self.table_name)
103105
self.log.info("Table: %s", table)
104106
self.log.info("Key: %s", key)
105-
response = table.get_item(Key=key)
107+
106108
try:
107-
item_attribute_value = response["Item"][self.attribute_name]
108-
self.log.info("Response: %s", response)
109-
self.log.info("Want: %s = %s", self.attribute_name, self.attribute_value)
110-
self.log.info("Got: {response['Item'][self.attribute_name]} = %s", item_attribute_value)
111-
return item_attribute_value in (
112-
[self.attribute_value] if isinstance(self.attribute_value, str) else self.attribute_value
109+
response = table.get_item(Key=key)
110+
except ClientError as err:
111+
self.log.error(
112+
"Couldn't get %s from table %s.\nError Code: %s\nError Message: %s",
113+
key,
114+
self.table_name,
115+
err.response["Error"]["Code"],
116+
err.response["Error"]["Message"],
113117
)
114-
except KeyError:
115118
return False
119+
else:
120+
try:
121+
item_attribute_value = response["Item"][self.attribute_name]
122+
self.log.info("Response: %s", response)
123+
self.log.info("Want: %s = %s", self.attribute_name, self.attribute_value)
124+
self.log.info("Got: {response['Item'][self.attribute_name]} = %s", item_attribute_value)
125+
return item_attribute_value in (
126+
[self.attribute_value] if isinstance(self.attribute_value, str) else self.attribute_value
127+
)
128+
except KeyError:
129+
return False

tests/providers/amazon/aws/sensors/test_dynamodb.py

+17
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,23 @@ def test_sensor_with_pk_and_sk(self):
104104

105105
assert self.sensor_pk_sk.poke(None)
106106

107+
@mock_aws
108+
def test_sensor_with_client_error(self):
109+
hook = DynamoDBHook(table_name=self.table_name, table_keys=[self.pk_name])
110+
111+
hook.conn.create_table(
112+
TableName=self.table_name,
113+
KeySchema=[{"AttributeName": self.pk_name, "KeyType": "HASH"}],
114+
AttributeDefinitions=[{"AttributeName": self.pk_name, "AttributeType": "S"}],
115+
ProvisionedThroughput={"ReadCapacityUnits": 10, "WriteCapacityUnits": 10},
116+
)
117+
118+
items = [{self.pk_name: self.pk_value, self.attribute_name: self.attribute_value}]
119+
hook.write_batch_data(items)
120+
121+
self.sensor_pk.partition_key_name = "no such key"
122+
assert self.sensor_pk.poke(None) is False
123+
107124

108125
class TestDynamoDBMultipleValuesSensor:
109126
def setup_method(self):

0 commit comments

Comments
 (0)