Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(codepipeline): add new check codepipeline_project_repo_private #5915

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from prowler.providers.aws.services.codepipeline.codepipeline_service import (
CodePipeline,
)
from prowler.providers.common.provider import Provider

codepipeline_client = CodePipeline(Provider.get_global_provider())
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"Provider": "aws",
"CheckID": "codepipeline_project_repo_private",
"CheckTitle": "Ensure that CodePipeline projects do not use public GitHub or GitLab repositories as source.",
"CheckType": [],
"ServiceName": "codepipeline",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "medium",
"ResourceType": "Other",
"Description": "Ensure that CodePipeline projects do not use public GitHub or GitLab repositories as source.",
"Risk": "Using public Git repositories in CodePipeline projects could expose sensitive deployment configurations and increase the risk of supply chain attacks.",
"RelatedUrl": "https://docs.aws.amazon.com/codepipeline/latest/userguide/connections-github.html",
"Remediation": {
"Code": {
"CLI": "aws codestar-connections create-connection --provider-type GitHub|GitLab --connection-name <connection-name>",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Use private Git repositories for CodePipeline sources and ensure proper authentication is configured using AWS CodeStar Connections. Consider using AWS CodeCommit or other private repository solutions for sensitive code.",
"Url": "https://docs.aws.amazon.com/codepipeline/latest/userguide/connections"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check supports both GitHub and GitLab repositories through CodeStar Connections"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import ssl
import urllib.error
import urllib.request

from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.codepipeline.codepipeline_client import (
codepipeline_client,
)


class codepipeline_project_repo_private(Check):
"""Checks if AWS CodePipeline source repositories are configured as private.

This check verifies whether source repositories (GitHub or GitLab) connected to
CodePipeline are publicly accessible. It attempts to access the repositories
anonymously to determine their visibility status.

Attributes:
None
"""

def execute(self) -> list:
"""Executes the repository privacy check for all CodePipeline sources.

Iterates through all CodePipeline pipelines and checks if their source
repositories (GitHub/GitLab) are publicly accessible by attempting anonymous
access.

Returns:
list: List of Check_Report_AWS objects containing the findings for each
pipeline's source repository.
"""
findings = []

for pipeline in codepipeline_client.pipelines.values():
if (
pipeline.source
and pipeline.source.type == "CodeStarSourceConnection"
and "FullRepositoryId" in str(pipeline.source.configuration)
):
report = Check_Report_AWS(self.metadata())
report.region = pipeline.region
report.resource_id = pipeline.name
report.resource_arn = pipeline.arn
report.resource_tags = pipeline.tags

repo_id = pipeline.source.configuration.get("FullRepositoryId", "")

# Try both GitHub and GitLab URLs
github_url = f"https://github.com/{repo_id}"
gitlab_url = f"https://gitlab.com/{repo_id}"

is_public_github = self._is_public_repo(github_url)
is_public_gitlab = self._is_public_repo(gitlab_url)

if is_public_github:
report.status = "FAIL"
report.status_extended = f"CodePipeline {pipeline.name} source repository is public: {github_url}"
elif is_public_gitlab:
report.status = "FAIL"
report.status_extended = f"CodePipeline {pipeline.name} source repository is public: {gitlab_url}"
else:
report.status = "PASS"
report.status_extended = (
f"CodePipeline {pipeline.name} source repository is private"
)

findings.append(report)

return findings

def _is_public_repo(self, repo_url: str) -> bool:
"""Checks if a repository is publicly accessible.

Attempts to access the repository URL anonymously to determine if it's
public or private.

Args:
repo_url: String containing the repository URL to check.

Returns:
bool: True if the repository is public, False if private or inaccessible.

Note:
The method considers a repository private if:
- The URL redirects to a sign-in page
- The request fails with HTTP errors
- The URL is not accessible
"""
if repo_url.endswith(".git"):
repo_url = repo_url[:-4]

try:
context = ssl._create_unverified_context()
req = urllib.request.Request(repo_url, method="HEAD")
response = urllib.request.urlopen(req, context=context)
return not response.geturl().endswith("sign_in")
except (urllib.error.HTTPError, urllib.error.URLError):
return False
136 changes: 136 additions & 0 deletions prowler/providers/aws/services/codepipeline/codepipeline_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from typing import Optional

from botocore.exceptions import ClientError
from pydantic import BaseModel

from prowler.lib.logger import logger
from prowler.providers.aws.lib.service.service import AWSService


class CodePipeline(AWSService):
"""AWS CodePipeline service class for managing pipeline resources.

This class handles interactions with AWS CodePipeline service, including
listing pipelines and retrieving their states. It manages pipeline resources
and their associated metadata.

Attributes:
pipelines: Dictionary mapping pipeline ARNs to Pipeline objects.
"""

def __init__(self, provider):
"""Initializes the CodePipeline service class.

Args:
provider: AWS provider instance for making API calls.
"""
super().__init__(__class__.__name__, provider)
self.pipelines = {}
self.__threading_call__(self._list_pipelines)
if self.pipelines:
self.__threading_call__(self._get_pipeline_state, self.pipelines.values())

def _list_pipelines(self, regional_client):
"""Lists all CodePipeline pipelines in the specified region.

Retrieves all pipelines using pagination and creates Pipeline objects
for each pipeline found.

Args:
regional_client: AWS regional client for CodePipeline service.

Raises:
ClientError: If there is an AWS API error.
"""
logger.info("CodePipeline - Listing pipelines...")
try:
list_pipelines_paginator = regional_client.get_paginator("list_pipelines")
for page in list_pipelines_paginator.paginate():
for pipeline in page["pipelines"]:
pipeline_arn = f"arn:{self.audited_partition}:codepipeline:{regional_client.region}:{self.audited_account}:pipeline/{pipeline['name']}"
if self.pipelines is None:
self.pipelines = {}
self.pipelines[pipeline_arn] = Pipeline(
name=pipeline["name"],
arn=pipeline_arn,
region=regional_client.region,
)
except ClientError as error:
if error.response["Error"]["Code"] == "AccessDenied":
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
if not self.pipelines:
self.pipelines = None
else:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _get_pipeline_state(self, pipeline):
"""Retrieves the current state of a pipeline.

Gets detailed information about a pipeline including its source configuration
and tags.

Args:
pipeline: Pipeline object to retrieve state for.

Raises:
ClientError: If there is an AWS API error.
"""
logger.info("CodePipeline - Getting pipeline state...")
try:
regional_client = self.regional_clients[pipeline.region]
pipeline_info = regional_client.get_pipeline(name=pipeline.name)
source_info = pipeline_info["pipeline"]["stages"][0]["actions"][0]
pipeline.source = Source(
type=source_info["actionTypeId"]["provider"],
location=source_info["configuration"].get("FullRepositoryId", ""),
configuration=source_info["configuration"],
tags=pipeline_info.get("tags", []),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key tags does not exist, you have to use the API call list_tags_for_resource

)
except ClientError as error:
logger.error(
f"{pipeline.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{pipeline.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)


class Source(BaseModel):
"""Model representing a pipeline source configuration.

Attributes:
type: The type of source provider.
location: The location/path of the source repository.
configuration: Optional dictionary containing additional source configuration.
"""

type: str
location: str
configuration: Optional[dict]


class Pipeline(BaseModel):
"""Model representing an AWS CodePipeline pipeline.

Attributes:
name: The name of the pipeline.
arn: The ARN (Amazon Resource Name) of the pipeline.
region: The AWS region where the pipeline exists.
source: Optional Source object containing source configuration.
tags: Optional list of pipeline tags.
"""

name: str
arn: str
region: str
source: Optional[Source]
tags: Optional[list] = []
Loading
Loading