Skip to content

Commit

Permalink
fix(aws): only check artifacts that can be scanned for vulnerabilitie…
Browse files Browse the repository at this point in the history
…s by `ecr_repositories_scan_vulnerabilities_in_latest_image` (#4677)

Co-authored-by: Kay Agahd <[email protected]>
Co-authored-by: Pepe Fagoaga <[email protected]>
Co-authored-by: Sergio <[email protected]>
  • Loading branch information
4 people authored Aug 7, 2024
1 parent c54227b commit 318d2b1
Show file tree
Hide file tree
Showing 4 changed files with 447 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,27 @@ def execute(self):
for repository in registry.repositories:
# First check if the repository has images
if len(repository.images_details) > 0:
# We only want to check the latest image pushed
# We only want to check the latest image pushed that is scannable
image = repository.images_details[-1]

report = Check_Report_AWS(self.metadata())
report.region = repository.region
report.resource_id = repository.name
report.resource_arn = repository.arn
report.resource_tags = repository.tags
report.status = "PASS"
report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} scanned without findings."
status_extended_prefix = f"ECR repository '{repository.name}' has scanned the {image.type} container image with digest '{image.latest_digest}' and tag '{image.latest_tag}' "
report.status_extended = (
status_extended_prefix + "without findings."
)
if not image.scan_findings_status:
report.status = "FAIL"
report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} without a scan."
report.status_extended = (
status_extended_prefix + "without a scan."
)
elif image.scan_findings_status == "FAILED":
report.status = "FAIL"
report.status_extended = (
f"ECR repository {repository.name} with scan status FAILED."
status_extended_prefix + "with scan status FAILED."
)
elif (
image.scan_findings_status != "FAILED"
Expand All @@ -42,20 +46,29 @@ def execute(self):
and image.scan_findings_severity_count.critical
):
report.status = "FAIL"
report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} scanned with findings: CRITICAL->{image.scan_findings_severity_count.critical}."
report.status_extended = (
status_extended_prefix
+ f"with findings: CRITICAL->{image.scan_findings_severity_count.critical}."
)
elif minimum_severity == "HIGH" and (
image.scan_findings_severity_count.critical
or image.scan_findings_severity_count.high
):
report.status = "FAIL"
report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} scanned with findings: CRITICAL->{image.scan_findings_severity_count.critical}, HIGH->{image.scan_findings_severity_count.high}."
report.status_extended = (
status_extended_prefix
+ f"with findings: CRITICAL->{image.scan_findings_severity_count.critical}, HIGH->{image.scan_findings_severity_count.high}."
)
elif minimum_severity == "MEDIUM" and (
image.scan_findings_severity_count.critical
or image.scan_findings_severity_count.high
or image.scan_findings_severity_count.medium
):
report.status = "FAIL"
report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} scanned with findings: CRITICAL->{image.scan_findings_severity_count.critical}, HIGH->{image.scan_findings_severity_count.high}, MEDIUM->{image.scan_findings_severity_count.medium}."
report.status_extended = (
status_extended_prefix
+ f"with findings: CRITICAL->{image.scan_findings_severity_count.critical}, HIGH->{image.scan_findings_severity_count.high}, MEDIUM->{image.scan_findings_severity_count.medium}."
)

findings.append(report)

Expand Down
187 changes: 140 additions & 47 deletions prowler/providers/aws/services/ecr/ecr_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ def __init__(self, audit_info):
super().__init__(__class__.__name__, audit_info)
self.registry_id = audit_info.audited_account
self.registries = {}
self.__threading_call__(self.__describe_registries_and_repositories__)
self.__threading_call__(self.__describe_repository_policies__)
self.__threading_call__(self.__get_image_details__)
self.__threading_call__(self.__get_repository_lifecycle_policy__)
self.__threading_call__(self.__get_registry_scanning_configuration__)
self.__threading_call__(self.__list_tags_for_resource__)

def __describe_registries_and_repositories__(self, regional_client):
self.__threading_call__(self._describe_registries_and_repositories)
self.__threading_call__(self._describe_repository_policies)
self.__threading_call__(self._get_image_details)
self.__threading_call__(self._get_repository_lifecycle_policy)
self.__threading_call__(self._get_registry_scanning_configuration)
self.__threading_call__(self._list_tags_for_resource)

def _describe_registries_and_repositories(self, regional_client):
logger.info("ECR - Describing registries and repositories...")
regional_registry_repositories = []
try:
Expand Down Expand Up @@ -64,7 +64,7 @@ def __describe_registries_and_repositories__(self, regional_client):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def __describe_repository_policies__(self, regional_client):
def _describe_repository_policies(self, regional_client):
logger.info("ECR - Describing repository policies...")
try:
if regional_client.region in self.registries:
Expand All @@ -91,7 +91,7 @@ def __describe_repository_policies__(self, regional_client):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def __get_repository_lifecycle_policy__(self, regional_client):
def _get_repository_lifecycle_policy(self, regional_client):
logger.info("ECR - Getting repository lifecycle policy...")
try:
if regional_client.region in self.registries:
Expand Down Expand Up @@ -119,7 +119,7 @@ def __get_repository_lifecycle_policy__(self, regional_client):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def __get_image_details__(self, regional_client):
def _get_image_details(self, regional_client):
logger.info("ECR - Getting images details...")
try:
if regional_client.region in self.registries:
Expand All @@ -139,55 +139,108 @@ def __get_image_details__(self, regional_client):
# The following condition is required since sometimes
# the AWS ECR API returns None using the iterator
if image is not None:
severity_counts = None
last_scan_status = None
if "imageScanStatus" in image:
last_scan_status = image["imageScanStatus"][
"status"
]

if "imageScanFindingsSummary" in image:
severity_counts = FindingSeverityCounts(
critical=0, high=0, medium=0
)
finding_severity_counts = image[
artifact_media_type = image.get(
"artifactMediaType", None
)
tags = image.get("imageTags", [])
if ECR._is_artifact_scannable(
artifact_media_type, tags
):
severity_counts = None
last_scan_status = None
image_digest = image.get("imageDigest")
latest_tag = image.get("imageTags", ["None"])[0]
image_pushed_at = image.get("imagePushedAt")
image_scan_findings_field_name = (
"imageScanFindingsSummary"
]["findingSeverityCounts"]
if "CRITICAL" in finding_severity_counts:
)
if "docker" in artifact_media_type:
type = "Docker"
elif "oci" in artifact_media_type:
type = "OCI"
else:
type = ""

# If imageScanStatus is not present or imageScanFindingsSummary is missing,
# we need to call DescribeImageScanFindings because AWS' new version of
# basic scanning does not support imageScanFindingsSummary and imageScanStatus
# in the DescribeImages API.
if "imageScanStatus" not in image:
try:
# use "image" for scan findings to get data the same way as for an image
image = (
client.describe_image_scan_findings(
registryId=self.registries[
regional_client.region
].id,
repositoryName=repository.name,
imageId={
"imageDigest": image_digest
},
)
)
image_scan_findings_field_name = (
"imageScanFindings"
)
except (
client.exceptions.ImageNotFoundException
) as error:
logger.warning(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue

if "imageScanStatus" in image:
last_scan_status = image["imageScanStatus"][
"status"
]

if image_scan_findings_field_name in image:
severity_counts = FindingSeverityCounts(
critical=0, high=0, medium=0
)
finding_severity_counts = image[
image_scan_findings_field_name
]["findingSeverityCounts"]
severity_counts.critical = (
finding_severity_counts["CRITICAL"]
finding_severity_counts.get(
"CRITICAL", 0
)
)
if "HIGH" in finding_severity_counts:
severity_counts.high = (
finding_severity_counts["HIGH"]
finding_severity_counts.get("HIGH", 0)
)
if "MEDIUM" in finding_severity_counts:
severity_counts.medium = (
finding_severity_counts["MEDIUM"]
finding_severity_counts.get("MEDIUM", 0)
)

repository.images_details.append(
ImageDetails(
latest_tag=latest_tag,
image_pushed_at=image_pushed_at,
latest_digest=image_digest,
scan_findings_status=last_scan_status,
scan_findings_severity_count=severity_counts,
artifact_media_type=artifact_media_type,
type=type,
)
latest_tag = "None"
if image.get("imageTags"):
latest_tag = image["imageTags"][0]
repository.images_details.append(
ImageDetails(
latest_tag=latest_tag,
image_pushed_at=image["imagePushedAt"],
latest_digest=image["imageDigest"],
scan_findings_status=last_scan_status,
scan_findings_severity_count=severity_counts,
)
)
# Sort the repository images by date pushed
repository.images_details.sort(
key=lambda image: image.image_pushed_at
)
# Sort the repository images by date pushed
repository.images_details.sort(
key=lambda image: image.image_pushed_at
)

except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def __list_tags_for_resource__(self, regional_client):
def _list_tags_for_resource(self, regional_client):
logger.info("ECR - List Tags...")
try:
if regional_client.region in self.registries:
Expand Down Expand Up @@ -215,7 +268,7 @@ def __list_tags_for_resource__(self, regional_client):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def __get_registry_scanning_configuration__(self, regional_client):
def _get_registry_scanning_configuration(self, regional_client):
logger.info("ECR - Getting Registry Scanning Configuration...")
try:
if regional_client.region in self.registries:
Expand Down Expand Up @@ -251,6 +304,44 @@ def __get_registry_scanning_configuration__(self, regional_client):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

@staticmethod
def _is_artifact_scannable(artifact_media_type: str, tags: list[str] = []) -> bool:
"""
Check if an artifact is scannable based on its media type and tags.
Args:
artifact_media_type (str): The media type of the artifact.
tags (list): The list of tags associated with the artifact.
Returns:
bool: True if the artifact is scannable, False otherwise.
"""
try:
if artifact_media_type is None:
return False

# Tools like GoogleContainerTools/jib uses `application/vnd.oci.image.config.v1+json`` also for signatures, which are not scannable.
# Luckily, these are tagged with sha-<HASH-CODE>.sig, so that they can still be easily recognized.
for tag in tags:
if tag.startswith("sha256-") and tag.endswith(".sig"):
return False

scannable_artifact_media_types = [
"application/vnd.docker.container.image.v1+json", # Docker image configuration
"application/vnd.docker.image.rootfs.diff.tar", # Docker image layer as a tar archive
"application/vnd.docker.image.rootfs.diff.tar.gzip", # Docker image layer that is compressed using gzip
"application/vnd.oci.image.config.v1+json", # OCI image configuration, but also used by GoogleContainerTools/jib for signatures
"application/vnd.oci.image.layer.v1.tar", # Uncompressed OCI image layer
"application/vnd.oci.image.layer.v1.tar+gzip", # Compressed OCI image layer
]

return artifact_media_type in scannable_artifact_media_types
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False


class FindingSeverityCounts(BaseModel):
critical: int
Expand All @@ -264,6 +355,8 @@ class ImageDetails(BaseModel):
image_pushed_at: datetime
scan_findings_status: Optional[str]
scan_findings_severity_count: Optional[FindingSeverityCounts]
artifact_media_type: Optional[str]
type: str


class Repository(BaseModel):
Expand Down
Loading

0 comments on commit 318d2b1

Please sign in to comment.