Skip to content

Commit

Permalink
fix: codepipeline_service.py, test code
Browse files Browse the repository at this point in the history
  • Loading branch information
yyyy7246 committed Dec 14, 2024
1 parent 5c1c360 commit 5bfc7f5
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 138 deletions.
48 changes: 32 additions & 16 deletions prowler/providers/aws/services/codepipeline/codepipeline_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ def _list_pipelines(self, regional_client):
def _get_pipeline_state(self, pipeline):
"""Retrieves the current state of a pipeline.
Gets detailed information about a pipeline including its source configuration
and tags.
Gets detailed information about a pipeline including its source configuration.
Args:
pipeline: Pipeline object to retrieve state for.
Expand All @@ -88,24 +87,14 @@ def _get_pipeline_state(self, pipeline):
regional_client = self.regional_clients[pipeline.region]
pipeline_info = regional_client.get_pipeline(name=pipeline.name)
source_info = pipeline_info["pipeline"]["stages"][0]["actions"][0]
repository_id = source_info["configuration"].get("FullRepositoryId", "")
pipeline.source = Source(
type=source_info["actionTypeId"]["provider"],
location=source_info["configuration"].get("FullRepositoryId", ""),
location=repository_id,
configuration=source_info["configuration"],
)

# Get tags using list_tags_for_resource API
try:
tags_response = regional_client.list_tags_for_resource(
resourceArn=pipeline.arn
)
pipeline.tags = tags_response.get("tags", [])
except ClientError as error:
logger.error(
f"Error getting tags for pipeline {pipeline.name}: {error}"
)
pipeline.tags = []

self._list_tags_for_resource(pipeline) # 태그 조회 호출 추가
pipeline.status_extended = f"CodePipeline {pipeline.name} source repository {repository_id} is private."
except ClientError as error:
logger.error(
f"{pipeline.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
Expand All @@ -115,6 +104,32 @@ def _get_pipeline_state(self, pipeline):
f"{pipeline.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _list_tags_for_resource(self, resource):
"""Lists tags for a given resource.
Args:
resource: Resource object to retrieve tags for.
"""
logger.info("CodePipeline - Listing Tags...")
try:
tags_response = self.regional_clients[
resource.region
].list_tags_for_resource(ResourceArn=resource.arn)
resource.tags = tags_response.get("Tags", []) # get 메서드로 변경
except ClientError as error:
if error.response["Error"]["Code"] == "ResourceNotFoundException":
logger.warning(
f"{resource.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: Resource {resource.arn} not found while listing tags"
)
else:
logger.error(
f"{resource.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{resource.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)


class Source(BaseModel):
"""Model representing a pipeline source configuration.
Expand Down Expand Up @@ -146,3 +161,4 @@ class Pipeline(BaseModel):
region: str
source: Optional[Source]
tags: Optional[list] = []
status_extended: Optional[str] = None
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,10 @@ class Test_codepipeline_project_repo_private:
"""

def test_pipeline_private_repo(self):
"""Test detection of private GitHub repository in CodePipeline.
Tests that the check correctly identifies a private GitHub repository
when HTTP 404 response is received.
Returns:
None
"""Test detection of private repository in CodePipeline.
Tests that the check correctly identifies a private repository
when both GitHub and GitLab return 404.
"""

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION]),
Expand All @@ -36,6 +31,7 @@ def test_pipeline_private_repo(self):
pipeline_name = "test-pipeline"
pipeline_arn = f"arn:aws:codepipeline:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:pipeline/{pipeline_name}"
connection_arn = f"arn:aws:codestar-connections:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:connection/test-connection"
repo_id = "prowler-cloud/prowler-private"

codepipeline_client.pipelines = {
pipeline_arn: Pipeline(
Expand All @@ -44,9 +40,9 @@ def test_pipeline_private_repo(self):
region=AWS_REGION,
source=Source(
type="CodeStarSourceConnection",
location="prowler-cloud/prowler-private",
location=repo_id,
configuration={
"FullRepositoryId": "prowler-cloud/prowler-private",
"FullRepositoryId": repo_id,
"ConnectionArn": connection_arn,
},
),
Expand All @@ -70,12 +66,12 @@ def test_pipeline_private_repo(self):
"Connection": {"ProviderType": "GitHub"}
}

# Mock URL check response for private repo
mock_response = mock.MagicMock()
mock_response.getcode.return_value = 404
mock_urlopen.side_effect = urllib.error.HTTPError(
url="", code=404, msg="", hdrs={}, fp=None
)
def mock_urlopen_side_effect(req, context=None):
raise urllib.error.HTTPError(
url="", code=404, msg="", hdrs={}, fp=None
)

mock_urlopen.side_effect = mock_urlopen_side_effect

from prowler.providers.aws.services.codepipeline.codepipeline_project_repo_private.codepipeline_project_repo_private import (
codepipeline_project_repo_private,
Expand All @@ -88,19 +84,18 @@ def test_pipeline_private_repo(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CodePipeline {pipeline_name} source repository prowler-cloud/prowler-private is private."
== f"CodePipeline {pipeline_name} source repository {repo_id} is private."
)
assert result[0].resource_id == pipeline_name
assert result[0].resource_arn == pipeline_arn
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION

def test_pipeline_public_repo(self):
def test_pipeline_public_github_repo(self):
"""Test detection of public GitHub repository in CodePipeline.
Tests that the check correctly identifies a public GitHub repository
when HTTP 200 response is received.
Returns:
None
when GitHub returns 200.
"""

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION]),
Expand All @@ -109,6 +104,7 @@ def test_pipeline_public_repo(self):
pipeline_name = "test-pipeline"
pipeline_arn = f"arn:aws:codepipeline:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:pipeline/{pipeline_name}"
connection_arn = f"arn:aws:codestar-connections:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:connection/test-connection"
repo_id = "prowler-cloud/prowler"

codepipeline_client.pipelines = {
pipeline_arn: Pipeline(
Expand All @@ -117,9 +113,9 @@ def test_pipeline_public_repo(self):
region=AWS_REGION,
source=Source(
type="CodeStarSourceConnection",
location="prowler-cloud/prowler",
location=repo_id,
configuration={
"FullRepositoryId": "prowler-cloud/prowler",
"FullRepositoryId": repo_id,
"ConnectionArn": connection_arn,
},
),
Expand All @@ -143,88 +139,18 @@ def test_pipeline_public_repo(self):
"Connection": {"ProviderType": "GitHub"}
}

# Mock URL check response for public repo
mock_response = mock.MagicMock()
mock_response.getcode.return_value = 200
mock_response.geturl.return_value = (
"https://github.com/prowler-cloud/prowler"
)
mock_urlopen.return_value = mock_response

from prowler.providers.aws.services.codepipeline.codepipeline_project_repo_private.codepipeline_project_repo_private import (
codepipeline_project_repo_private,
)

check = codepipeline_project_repo_private()
result = check.execute()

assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CodePipeline {pipeline_name} source repository is public: https://github.com/prowler-cloud/prowler"
)

def test_pipeline_private_gitlab_repo(self):
"""Test detection of private GitLab repository in CodePipeline.
Tests that the check correctly identifies a private GitLab repository
when redirected to sign-in page.
Returns:
None
Note:
GitLab returns 200 but redirects to sign-in page for private repos.
"""

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION]),
):
codepipeline_client = mock.MagicMock
pipeline_name = "test-pipeline"
pipeline_arn = f"arn:aws:codepipeline:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:pipeline/{pipeline_name}"
connection_arn = f"arn:aws:codestar-connections:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:connection/test-connection"

codepipeline_client.pipelines = {
pipeline_arn: Pipeline(
name=pipeline_name,
arn=pipeline_arn,
region=AWS_REGION,
source=Source(
type="CodeStarSourceConnection",
location="prowler-cloud/prowler-private",
configuration={
"FullRepositoryId": "prowler-cloud/prowler-private",
"ConnectionArn": connection_arn,
},
),
tags=[],
)
}
mock_response.geturl.return_value = f"https://github.com/{repo_id}"

with mock.patch(
"prowler.providers.aws.services.codepipeline.codepipeline_service.CodePipeline",
codepipeline_client,
), mock.patch(
"prowler.providers.aws.services.codepipeline.codepipeline_project_repo_private.codepipeline_project_repo_private.codepipeline_client",
codepipeline_client,
), mock.patch(
"boto3.client"
) as mock_client, mock.patch(
"urllib.request.urlopen"
) as mock_urlopen:
mock_connection = mock_client.return_value
mock_connection.get_connection.return_value = {
"Connection": {"ProviderType": "GitLab"}
}
def mock_urlopen_side_effect(req, context=None):
if "github.com" in req.get_full_url():
return mock_response
raise urllib.error.HTTPError(
url="", code=404, msg="", hdrs={}, fp=None
)

# Mock URL check response for private repo
mock_response = mock.MagicMock()
mock_response.getcode.return_value = 200
mock_response.geturl.return_value = "https://gitlab.com/sign_in"
mock_urlopen.return_value = mock_response
mock_urlopen.side_effect = mock_urlopen_side_effect

from prowler.providers.aws.services.codepipeline.codepipeline_project_repo_private.codepipeline_project_repo_private import (
codepipeline_project_repo_private,
Expand All @@ -234,10 +160,10 @@ def test_pipeline_private_gitlab_repo(self):
result = check.execute()

assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CodePipeline {pipeline_name} source repository prowler-cloud/prowler-private is private."
== f"CodePipeline {pipeline_name} source repository is public: https://github.com/{repo_id}"
)
assert result[0].resource_id == pipeline_name
assert result[0].resource_arn == pipeline_arn
Expand All @@ -246,15 +172,8 @@ def test_pipeline_private_gitlab_repo(self):

def test_pipeline_public_gitlab_repo(self):
"""Test detection of public GitLab repository in CodePipeline.
Tests that the check correctly identifies a public GitLab repository
when direct access is possible.
Returns:
None
Note:
GitLab returns 200 with direct repo access for public repos.
when GitLab returns 200 without sign_in redirect.
"""
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
Expand All @@ -264,6 +183,7 @@ def test_pipeline_public_gitlab_repo(self):
pipeline_name = "test-pipeline"
pipeline_arn = f"arn:aws:codepipeline:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:pipeline/{pipeline_name}"
connection_arn = f"arn:aws:codestar-connections:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:connection/test-connection"
repo_id = "prowler-cloud/prowler-private"

codepipeline_client.pipelines = {
pipeline_arn: Pipeline(
Expand All @@ -272,9 +192,9 @@ def test_pipeline_public_gitlab_repo(self):
region=AWS_REGION,
source=Source(
type="CodeStarSourceConnection",
location="prowler-cloud/prowler-private",
location=repo_id,
configuration={
"FullRepositoryId": "prowler-cloud/prowler-private",
"FullRepositoryId": repo_id,
"ConnectionArn": connection_arn,
},
),
Expand All @@ -298,11 +218,18 @@ def test_pipeline_public_gitlab_repo(self):
"Connection": {"ProviderType": "GitLab"}
}

# Mock URL check response for public repo
mock_response = mock.MagicMock()
mock_response.getcode.return_value = 200
mock_response.geturl.return_value = "https://gitlab.com/test/repo"
mock_urlopen.return_value = mock_response
mock_response.geturl.return_value = f"https://gitlab.com/{repo_id}"

def mock_urlopen_side_effect(req, context=None):
if "gitlab.com" in req.get_full_url():
return mock_response
raise urllib.error.HTTPError(
url="", code=404, msg="", hdrs={}, fp=None
)

mock_urlopen.side_effect = mock_urlopen_side_effect

from prowler.providers.aws.services.codepipeline.codepipeline_project_repo_private.codepipeline_project_repo_private import (
codepipeline_project_repo_private,
Expand All @@ -315,7 +242,7 @@ def test_pipeline_public_gitlab_repo(self):
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CodePipeline {pipeline_name} source repository is public: https://gitlab.com/prowler-cloud/prowler-private"
== f"CodePipeline {pipeline_name} source repository is public: https://gitlab.com/{repo_id}"
)
assert result[0].resource_id == pipeline_name
assert result[0].resource_arn == pipeline_arn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
pipeline_name = "test-pipeline"
pipeline_arn = f"arn:{AWS_COMMERCIAL_PARTITION}:codepipeline:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:pipeline/{pipeline_name}"
source_type = "CodeStarSourceConnection"
repository_id = "test/repo"
repository_id = "prowler-cloud/prowler-private"
connection_arn = f"arn:{AWS_COMMERCIAL_PARTITION}:codestar-connections:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:connection/test"

# Mocking API calls
Expand Down Expand Up @@ -53,8 +53,9 @@ def mock_make_api_call(self, operation_name, kwarg):
}
],
},
"tags": [{"key": "Environment", "value": "Test"}],
}
elif operation_name == "ListTagsForResource":
return {"Tags": [{"key": "Environment", "value": "Test"}]}
return make_api_call(self, operation_name, kwarg)


Expand Down Expand Up @@ -102,3 +103,7 @@ def test_codepipeline_service(self):
# Test tags
assert pipeline.tags[0]["key"] == "Environment"
assert pipeline.tags[0]["value"] == "Test"

# Test status extended
expected_status = f"CodePipeline {pipeline_name} source repository prowler-cloud/prowler-private is private."
assert pipeline.status_extended == expected_status

0 comments on commit 5bfc7f5

Please sign in to comment.