Skip to content

Commit b7c22d1

Browse files
github-actions[bot]puchy22jfagoagasMrCloudSec
authored
fix(autoscaling): Add exception manage while decoding UserData (#4675)
Co-authored-by: Rubén De la Torre Vico <[email protected]> Co-authored-by: Pepe Fagoaga <[email protected]> Co-authored-by: Sergio <[email protected]>
1 parent 318d2b1 commit b7c22d1

File tree

8 files changed

+112
-14
lines changed

8 files changed

+112
-14
lines changed

prowler/config/config.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ def get_available_compliance_frameworks():
6161
default_config_file_path = (
6262
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/config.yaml"
6363
)
64+
encoding_format_utf_8 = "utf-8"
6465

6566

6667
def check_current_version():
@@ -102,8 +103,7 @@ def load_and_validate_config_file(provider: str, config_file_path: str) -> dict:
102103
load_and_validate_config_file reads the Prowler config file in YAML format from the default location or the file passed with the --config-file flag
103104
"""
104105
try:
105-
with open(config_file_path) as f:
106-
config = {}
106+
with open(config_file_path, "r", encoding=encoding_format_utf_8) as f:
107107
config_file = yaml.safe_load(f)
108108

109109
# Not to introduce a breaking change we have to allow the old format config file without any provider keys

prowler/lib/utils/utils.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@
1212
from detect_secrets import SecretsCollection
1313
from detect_secrets.settings import default_settings
1414

15+
from prowler.config.config import encoding_format_utf_8
1516
from prowler.lib.logger import logger
1617

1718

1819
def open_file(input_file: str, mode: str = "r") -> TextIOWrapper:
1920
"""open_file returns a handler to the file using the specified mode."""
2021
try:
21-
f = open(input_file, mode)
22+
f = open(input_file, mode, encoding=encoding_format_utf_8)
2223
except OSError as os_error:
2324
if os_error.strerror == "Too many open files":
2425
logger.critical(
@@ -66,7 +67,7 @@ def file_exists(filename: str):
6667

6768
def hash_sha512(string: str) -> str:
6869
"""hash_sha512 returns the first 9 bytes of the SHA512 representation for the given string."""
69-
return sha512(string.encode("utf-8")).hexdigest()[0:9]
70+
return sha512(string.encode(encoding_format_utf_8)).hexdigest()[0:9]
7071

7172

7273
def detect_secrets_scan(data):

prowler/providers/aws/services/autoscaling/autoscaling_find_secrets_ec2_launch_configuration/autoscaling_find_secrets_ec2_launch_configuration.py

+18-5
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
from detect_secrets import SecretsCollection
77
from detect_secrets.settings import default_settings
88

9+
from prowler.config.config import encoding_format_utf_8
910
from prowler.lib.check.models import Check, Check_Report_AWS
11+
from prowler.lib.logger import logger
1012
from prowler.providers.aws.services.autoscaling.autoscaling_client import (
1113
autoscaling_client,
1214
)
@@ -25,12 +27,23 @@ def execute(self):
2527
temp_user_data_file = tempfile.NamedTemporaryFile(delete=False)
2628
user_data = b64decode(configuration.user_data)
2729

28-
if user_data[0:2] == b"\x1f\x8b": # GZIP magic number
29-
user_data = zlib.decompress(user_data, zlib.MAX_WBITS | 32).decode(
30-
"utf-8"
30+
try:
31+
if user_data[0:2] == b"\x1f\x8b": # GZIP magic number
32+
user_data = zlib.decompress(
33+
user_data, zlib.MAX_WBITS | 32
34+
).decode(encoding_format_utf_8)
35+
else:
36+
user_data = user_data.decode(encoding_format_utf_8)
37+
except UnicodeDecodeError as error:
38+
logger.warning(
39+
f"{configuration.region} -- Unable to decode user data in autoscaling launch configuration {configuration.name}: {error}"
3140
)
32-
else:
33-
user_data = user_data.decode("utf-8")
41+
continue
42+
except Exception as error:
43+
logger.warning(
44+
f"{configuration.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
45+
)
46+
continue
3447

3548
temp_user_data_file.write(
3649
bytes(user_data, encoding="raw_unicode_escape")

prowler/providers/aws/services/ec2/ec2_instance_secrets_user_data/ec2_instance_secrets_user_data.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from detect_secrets import SecretsCollection
77
from detect_secrets.settings import default_settings
88

9+
from prowler.config.config import encoding_format_utf_8
910
from prowler.lib.check.models import Check, Check_Report_AWS
1011
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
1112

@@ -26,9 +27,9 @@ def execute(self):
2627
if user_data[0:2] == b"\x1f\x8b": # GZIP magic number
2728
user_data = zlib.decompress(
2829
user_data, zlib.MAX_WBITS | 32
29-
).decode("utf-8")
30+
).decode(encoding_format_utf_8)
3031
else:
31-
user_data = user_data.decode("utf-8")
32+
user_data = user_data.decode(encoding_format_utf_8)
3233

3334
temp_user_data_file.write(
3435
bytes(user_data, encoding="raw_unicode_escape")

prowler/providers/aws/services/iam/iam_service.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from botocore.client import ClientError
66
from pydantic import BaseModel
77

8+
from prowler.config.config import encoding_format_utf_8
89
from prowler.lib.logger import logger
910
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
1011
from prowler.providers.aws.lib.service.service import AWSService
@@ -144,7 +145,9 @@ def __get_credential_report__(self):
144145
if report_status["State"] == "COMPLETE":
145146
report_is_completed = True
146147
# Convert credential report to list of dictionaries
147-
credential = self.client.get_credential_report()["Content"].decode("utf-8")
148+
credential = self.client.get_credential_report()["Content"].decode(
149+
encoding_format_utf_8
150+
)
148151
credential_lines = credential.split("\n")
149152
csv_reader = csv.DictReader(credential_lines, delimiter=",")
150153
credential_list = list(csv_reader)

tests/providers/aws/services/autoscaling/autoscaling_find_secrets_ec2_launch_configuration/autoscaling_find_secrets_ec2_launch_configuration_test.py

+74
Original file line numberDiff line numberDiff line change
@@ -287,3 +287,77 @@ def test_one_autoscaling_file_with_secrets_gzip(self):
287287
assert result[0].resource_id == launch_configuration_name
288288
assert result[0].resource_arn == launch_configuration_arn
289289
assert result[0].region == AWS_REGION_US_EAST_1
290+
291+
@mock_aws
292+
def test_one_autoscaling_file_with_unicode_error(self):
293+
# Include launch_configurations to check
294+
invalid_utf8_bytes = b"\xc0\xaf"
295+
launch_configuration_name = "tester"
296+
autoscaling_client = client("autoscaling", region_name=AWS_REGION_US_EAST_1)
297+
autoscaling_client.create_launch_configuration(
298+
LaunchConfigurationName=launch_configuration_name,
299+
ImageId="ami-12c6146b",
300+
InstanceType="t1.micro",
301+
KeyName="the_keys",
302+
SecurityGroups=["default", "default2"],
303+
UserData=invalid_utf8_bytes,
304+
)
305+
306+
from prowler.providers.aws.services.autoscaling.autoscaling_service import (
307+
AutoScaling,
308+
)
309+
310+
current_audit_info = set_mocked_aws_audit_info([AWS_REGION_US_EAST_1])
311+
312+
with mock.patch(
313+
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
314+
new=current_audit_info,
315+
), mock.patch(
316+
"prowler.providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_client",
317+
new=AutoScaling(current_audit_info),
318+
):
319+
from prowler.providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration import (
320+
autoscaling_find_secrets_ec2_launch_configuration,
321+
)
322+
323+
check = autoscaling_find_secrets_ec2_launch_configuration()
324+
result = check.execute()
325+
326+
assert len(result) == 0
327+
328+
@mock_aws
329+
def test_one_autoscaling_file_invalid_gzip_error(self):
330+
# Include launch_configurations to check
331+
invalid_gzip_bytes = b"\x1f\x8b\xc0\xaf"
332+
launch_configuration_name = "tester"
333+
autoscaling_client = client("autoscaling", region_name=AWS_REGION_US_EAST_1)
334+
autoscaling_client.create_launch_configuration(
335+
LaunchConfigurationName=launch_configuration_name,
336+
ImageId="ami-12c6146b",
337+
InstanceType="t1.micro",
338+
KeyName="the_keys",
339+
SecurityGroups=["default", "default2"],
340+
UserData=invalid_gzip_bytes,
341+
)
342+
343+
from prowler.providers.aws.services.autoscaling.autoscaling_service import (
344+
AutoScaling,
345+
)
346+
347+
current_audit_info = set_mocked_aws_audit_info([AWS_REGION_US_EAST_1])
348+
349+
with mock.patch(
350+
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
351+
new=current_audit_info,
352+
), mock.patch(
353+
"prowler.providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_client",
354+
new=AutoScaling(current_audit_info),
355+
):
356+
from prowler.providers.aws.services.autoscaling.autoscaling_find_secrets_ec2_launch_configuration.autoscaling_find_secrets_ec2_launch_configuration import (
357+
autoscaling_find_secrets_ec2_launch_configuration,
358+
)
359+
360+
check = autoscaling_find_secrets_ec2_launch_configuration()
361+
result = check.execute()
362+
363+
assert len(result) == 0

tests/providers/aws/services/autoscaling/autoscaling_service_test.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from boto3 import client
44
from moto import mock_aws
55

6+
from prowler.config.config import encoding_format_utf_8
67
from prowler.providers.aws.services.autoscaling.autoscaling_service import AutoScaling
78
from tests.providers.aws.audit_info_utils import (
89
AWS_ACCOUNT_NUMBER,
@@ -72,7 +73,9 @@ def test__describe_launch_configurations__(self):
7273
assert len(autoscaling.launch_configurations) == 2
7374
assert autoscaling.launch_configurations[0].name == "tester1"
7475
assert (
75-
b64decode(autoscaling.launch_configurations[0].user_data).decode("utf-8")
76+
b64decode(autoscaling.launch_configurations[0].user_data).decode(
77+
encoding_format_utf_8
78+
)
7679
== "DB_PASSWORD=foobar123"
7780
)
7881
assert autoscaling.launch_configurations[0].image_id == "ami-12c6146b"

tests/providers/aws/services/ec2/ec2_service_test.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from freezegun import freeze_time
99
from moto import mock_aws
1010

11+
from prowler.config.config import encoding_format_utf_8
1112
from prowler.providers.aws.services.ec2.ec2_service import EC2
1213
from tests.providers.aws.audit_info_utils import (
1314
AWS_ACCOUNT_NUMBER,
@@ -316,7 +317,9 @@ def test__get_instance_user_data__(self):
316317
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
317318
)
318319
ec2 = EC2(audit_info)
319-
assert user_data == b64decode(ec2.instances[0].user_data).decode("utf-8")
320+
assert user_data == b64decode(ec2.instances[0].user_data).decode(
321+
encoding_format_utf_8
322+
)
320323

321324
# Test EC2 Get EBS Encryption by default
322325
@mock_aws

0 commit comments

Comments
 (0)