From 3181d26c851c9e47ab8fbfa67bc86affda17a7d5 Mon Sep 17 00:00:00 2001 From: Omri Yoffe Date: Tue, 17 Dec 2024 16:33:01 +0200 Subject: [PATCH 1/4] serverless definitions context --- .../graph_builder/definition_context.py | 55 +++++++++++++++++-- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/checkov/serverless/graph_builder/definition_context.py b/checkov/serverless/graph_builder/definition_context.py index 8618135afa6..1b9f2deade9 100644 --- a/checkov/serverless/graph_builder/definition_context.py +++ b/checkov/serverless/graph_builder/definition_context.py @@ -1,14 +1,61 @@ from __future__ import annotations -from typing import Any +from typing import cast, Any + +from checkov.common.util.consts import LINE_FIELD_NAMES +from checkov.common.parsers.node import StrNode +from checkov.common.util.consts import START_LINE, END_LINE +from checkov.common.util.suppression import collect_suppressions_for_report +from checkov.serverless.utils import ServerlessElements def build_definitions_context(definitions: dict[str, dict[str, Any]], definitions_raw: dict[str, list[tuple[int, str]]] ) -> dict[str, dict[str, Any]]: - return {} + definitions_context: dict[str, dict[str, Any]] = {} + for file_path, file_definitions in definitions.items(): + definitions_context[file_path] = {} + for definition_attribute, definition_value in file_definitions.items(): + if definition_attribute not in ServerlessElements._member_map_.values(): + continue + definitions_context[file_path][definition_attribute] = {} + if isinstance(definition_value, dict): + for resource_key, resource_attributes in definition_value.items(): + add_resource_to_definitions_context(definitions_context, resource_key, resource_attributes, + definition_attribute, definitions_raw, file_path) + elif isinstance(definition_value, list): + for resource in definition_value: + add_resource_to_definitions_context(definitions_context, '', resource, definition_attribute, + definitions_raw, file_path) + + elif isinstance(definition_value, StrNode): + add_resource_to_definitions_context(definitions_context, definition_attribute, definition_value, + definition_attribute, + definitions_raw, file_path) + return definitions_context def add_resource_to_definitions_context(definitions_context: dict[str, dict[str, Any]], resource_key: str, - resource_attributes: dict[str, Any], definition_attribute: str, + resource_attributes: dict[str, Any] | StrNode, definition_attribute: str, definitions_raw: dict[str, Any], file_path: str) -> None: - pass + if not isinstance(resource_attributes, dict) and not isinstance(resource_attributes, StrNode): + return + + start_line = resource_attributes[ + START_LINE] if START_LINE in resource_attributes else resource_attributes.start_mark.line + end_line = resource_attributes[END_LINE] if END_LINE in resource_attributes else resource_attributes.end_mark.line + definition_resource = {} + + if resource_key is None: + resource_key = f"{resource_attributes.get('type')}.{resource_attributes.get('name')}" + int_start_line = cast(int, start_line) + int_end_line = cast(int, end_line) + code_lines_for_suppressions_check = definitions_raw[file_path][int_start_line: int_end_line] + definition_resource['skipped_checks'] = collect_suppressions_for_report( + code_lines=code_lines_for_suppressions_check) + if 'type' in resource_attributes: + definition_resource["type"] = resource_attributes.get('type') + + definition_resource["code_lines"] = definitions_raw[file_path][start_line: end_line + 1] + definition_resource["start_line"] = start_line + 1 + definition_resource["end_line"] = end_line + 1 + definitions_context[file_path][definition_attribute][resource_key] = definition_resource From 49f819ef20103f5b85392b916705faa8c8e51475 Mon Sep 17 00:00:00 2001 From: Omri Yoffe Date: Tue, 17 Dec 2024 16:46:51 +0200 Subject: [PATCH 2/4] typing: --- .../graph_builder/definition_context.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/checkov/serverless/graph_builder/definition_context.py b/checkov/serverless/graph_builder/definition_context.py index 1b9f2deade9..a7d115468fd 100644 --- a/checkov/serverless/graph_builder/definition_context.py +++ b/checkov/serverless/graph_builder/definition_context.py @@ -2,7 +2,6 @@ from typing import cast, Any -from checkov.common.util.consts import LINE_FIELD_NAMES from checkov.common.parsers.node import StrNode from checkov.common.util.consts import START_LINE, END_LINE from checkov.common.util.suppression import collect_suppressions_for_report @@ -40,19 +39,27 @@ def add_resource_to_definitions_context(definitions_context: dict[str, dict[str, if not isinstance(resource_attributes, dict) and not isinstance(resource_attributes, StrNode): return - start_line = resource_attributes[ - START_LINE] if START_LINE in resource_attributes else resource_attributes.start_mark.line - end_line = resource_attributes[END_LINE] if END_LINE in resource_attributes else resource_attributes.end_mark.line + start_line = None + end_line = None + + if isinstance(resource_attributes, dict): + start_line = resource_attributes[START_LINE] + end_line = resource_attributes[END_LINE] + + elif isinstance(resource_attributes, StrNode): + start_line = resource_attributes.start_mark.line + end_line = resource_attributes.end_mark.line + definition_resource = {} - if resource_key is None: + if resource_key is None and isinstance(resource_attributes, dict): resource_key = f"{resource_attributes.get('type')}.{resource_attributes.get('name')}" int_start_line = cast(int, start_line) int_end_line = cast(int, end_line) code_lines_for_suppressions_check = definitions_raw[file_path][int_start_line: int_end_line] definition_resource['skipped_checks'] = collect_suppressions_for_report( code_lines=code_lines_for_suppressions_check) - if 'type' in resource_attributes: + if isinstance(resource_attributes, dict) and 'type' in resource_attributes: definition_resource["type"] = resource_attributes.get('type') definition_resource["code_lines"] = definitions_raw[file_path][start_line: end_line + 1] From 1bfa487aa292122b375d65c0648bf2e0a7b6fe07 Mon Sep 17 00:00:00 2001 From: Omri Yoffe Date: Tue, 17 Dec 2024 17:25:48 +0200 Subject: [PATCH 3/4] different types line numbers --- .../graph_builder/definition_context.py | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/checkov/serverless/graph_builder/definition_context.py b/checkov/serverless/graph_builder/definition_context.py index a7d115468fd..15562ccd361 100644 --- a/checkov/serverless/graph_builder/definition_context.py +++ b/checkov/serverless/graph_builder/definition_context.py @@ -2,8 +2,8 @@ from typing import cast, Any -from checkov.common.parsers.node import StrNode -from checkov.common.util.consts import START_LINE, END_LINE +from checkov.common.parsers.node import StrNode, ListNode +from checkov.common.util.consts import START_LINE, END_LINE, LINE_FIELD_NAMES from checkov.common.util.suppression import collect_suppressions_for_report from checkov.serverless.utils import ServerlessElements @@ -34,35 +34,38 @@ def build_definitions_context(definitions: dict[str, dict[str, Any]], definition def add_resource_to_definitions_context(definitions_context: dict[str, dict[str, Any]], resource_key: str, - resource_attributes: dict[str, Any] | StrNode, definition_attribute: str, + resource_attributes: dict[str, Any] | ListNode | StrNode, definition_attribute: str, definitions_raw: dict[str, Any], file_path: str) -> None: - if not isinstance(resource_attributes, dict) and not isinstance(resource_attributes, StrNode): + if resource_key in LINE_FIELD_NAMES: return - start_line = None - end_line = None - if isinstance(resource_attributes, dict): - start_line = resource_attributes[START_LINE] - end_line = resource_attributes[END_LINE] + start_line = resource_attributes[START_LINE] - 1 + end_line = resource_attributes[END_LINE] - 1 - elif isinstance(resource_attributes, StrNode): + elif isinstance(resource_attributes, ListNode): start_line = resource_attributes.start_mark.line end_line = resource_attributes.end_mark.line - definition_resource = {} + elif isinstance(resource_attributes, StrNode): + start_line = resource_attributes.start_mark.line + 1 + end_line = resource_attributes.end_mark.line + 1 + + else: + return + + definition_resource = {"start_line": start_line, "end_line": end_line} if resource_key is None and isinstance(resource_attributes, dict): resource_key = f"{resource_attributes.get('type')}.{resource_attributes.get('name')}" - int_start_line = cast(int, start_line) - int_end_line = cast(int, end_line) + int_start_line = cast(int, definition_resource["start_line"]) + int_end_line = cast(int, definition_resource["end_line"]) code_lines_for_suppressions_check = definitions_raw[file_path][int_start_line: int_end_line] definition_resource['skipped_checks'] = collect_suppressions_for_report( code_lines=code_lines_for_suppressions_check) if isinstance(resource_attributes, dict) and 'type' in resource_attributes: definition_resource["type"] = resource_attributes.get('type') - definition_resource["code_lines"] = definitions_raw[file_path][start_line: end_line + 1] - definition_resource["start_line"] = start_line + 1 - definition_resource["end_line"] = end_line + 1 + definition_resource["code_lines"] = definitions_raw[file_path][start_line - 1: end_line] + definitions_context[file_path][definition_attribute][resource_key] = definition_resource From b366188488e97e082ff686ae5a4d87d93f67310c Mon Sep 17 00:00:00 2001 From: Omri Yoffe Date: Tue, 17 Dec 2024 17:35:36 +0200 Subject: [PATCH 4/4] mypy --- checkov/serverless/graph_builder/definition_context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/checkov/serverless/graph_builder/definition_context.py b/checkov/serverless/graph_builder/definition_context.py index 15562ccd361..3760a30ee6e 100644 --- a/checkov/serverless/graph_builder/definition_context.py +++ b/checkov/serverless/graph_builder/definition_context.py @@ -14,7 +14,7 @@ def build_definitions_context(definitions: dict[str, dict[str, Any]], definition for file_path, file_definitions in definitions.items(): definitions_context[file_path] = {} for definition_attribute, definition_value in file_definitions.items(): - if definition_attribute not in ServerlessElements._member_map_.values(): + if definition_attribute not in [str(e) for e in ServerlessElements]: continue definitions_context[file_path][definition_attribute] = {} if isinstance(definition_value, dict):