From 1e8fdaab7f46860e7f588c8e50cd5a365024a195 Mon Sep 17 00:00:00 2001 From: pedrooot Date: Fri, 13 Dec 2024 10:13:43 +0100 Subject: [PATCH] feat(gcp): resolve comments --- prowler/providers/gcp/gcp_provider.py | 59 +++++++++++++----------- tests/providers/gcp/gcp_provider_test.py | 28 +++++------ 2 files changed, 46 insertions(+), 41 deletions(-) diff --git a/prowler/providers/gcp/gcp_provider.py b/prowler/providers/gcp/gcp_provider.py index 86ede0e8fbd..e5dc4383447 100644 --- a/prowler/providers/gcp/gcp_provider.py +++ b/prowler/providers/gcp/gcp_provider.py @@ -330,10 +330,16 @@ def setup_session( Setup the GCP session with the provided credentials file or service account to impersonate Args: - credentials_file: str - service_account: dict - gcp_credentials: dict - service_account_key: dict + credentials_file: str -> The credentials file path used to authenticate + service_account: dict -> The service account to impersonate + gcp_credentials: dict -> The GCP credentials following the format: + { + "client_id": str, + "client_secret": str, + "refresh_token": str, + "type": str + } + service_account_key: dict -> The service account key, used to authenticate Returns: Credentials object and default project ID @@ -369,7 +375,6 @@ def setup_session( ) if service_account_key: - logger.info("Using service account key") logger.info( "GCP provider: Setting credentials from service account key..." ) @@ -757,6 +762,28 @@ def update_projects_with_organizations(self): f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) + def is_project_matching(self, input_project: str, project_to_match: str) -> bool: + """ + Check if the input project matches the project to match + + Args: + input_project: str + project_to_match: str + + Returns: + bool + + Usage: + >>> GcpProvider.is_project_matching(input_project, project_to_match) + """ + return ( + "*" in input_project + and re.search( + "." + input_project if input_project.startswith("*") else input_project, + project_to_match, + ) + ) or input_project == project_to_match + @staticmethod def validate_static_arguments( client_id: str = None, client_secret: str = None, refresh_token: str = None @@ -788,28 +815,6 @@ def validate_static_arguments( "type": "authorized_user", } - def is_project_matching(self, input_project: str, project_to_match: str) -> bool: - """ - Check if the input project matches the project to match - - Args: - input_project: str - project_to_match: str - - Returns: - bool - - Usage: - >>> GcpProvider.is_project_matching(input_project, project_to_match) - """ - return ( - "*" in input_project - and re.search( - "." + input_project if input_project.startswith("*") else input_project, - project_to_match, - ) - ) or input_project == project_to_match - @staticmethod def validate_project_id(provider_id: str, credentials: str = None) -> None: """ diff --git a/tests/providers/gcp/gcp_provider_test.py b/tests/providers/gcp/gcp_provider_test.py index 62dc8ee85a4..c82e5320a8a 100644 --- a/tests/providers/gcp/gcp_provider_test.py +++ b/tests/providers/gcp/gcp_provider_test.py @@ -789,6 +789,20 @@ def test_init_only_client_id(self): GcpProvider(client_id="test-client-id") assert "client_secret and refresh_token are required" in e.value.args[0] + def test_validate_static_arguments(self): + output = GcpProvider.validate_static_arguments( + client_id="test-client-id", + client_secret="test-client-secret", + refresh_token="test-refresh-token", + ) + + assert output == { + "client_id": "test-client-id", + "client_secret": "test-client-secret", + "refresh_token": "test-refresh-token", + "type": "authorized_user", + } + def test_test_connection_with_exception(self): with patch( "prowler.providers.gcp.gcp_provider.GcpProvider.setup_session", @@ -815,20 +829,6 @@ def test_test_connection_with_exception_service_account_key(self): assert e.type == GCPTestConnectionError assert "Test exception" in e.value.args[0] - def test_validate_static_arguments(self): - output = GcpProvider.validate_static_arguments( - client_id="test-client-id", - client_secret="test-client-secret", - refresh_token="test-refresh-token", - ) - - assert output == { - "client_id": "test-client-id", - "client_secret": "test-client-secret", - "refresh_token": "test-refresh-token", - "type": "authorized_user", - } - def test_test_connection_valid_project_id(self): project_id = "test-project-id" mocked_service = MagicMock()