Skip to content

Commit

Permalink
THREAT-397 Reformat deep_get(event to event.deep_get( (#1374)
Browse files Browse the repository at this point in the history
Co-authored-by: Ben Airey <[email protected]>
Co-authored-by: Ariel <[email protected]>
  • Loading branch information
3 people authored Oct 7, 2024
1 parent a61a96f commit 039ad9a
Show file tree
Hide file tree
Showing 280 changed files with 1,160 additions and 1,533 deletions.
33 changes: 18 additions & 15 deletions .scripts/mitre_mapping_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,26 @@
# All MITRE Tags must match this regex pattern
MITRE_PATTERN = re.compile("^TA\d+\:T\d+(\.\d+)?$")


def main(path: Path) -> bool:
# Load Repo
analysis_items = load_analysis_specs([path], ignore_files=[])

items_with_invalid_mappings = [] # Record all items with bad tags
items_with_invalid_mappings = [] # Record all items with bad tags
for analysis_item in analysis_items:
rel_path = analysis_item[0] # Relative path to YAML file
spec = analysis_item[2] # YAML spec as a dict
rel_path = analysis_item[0] # Relative path to YAML file
spec = analysis_item[2] # YAML spec as a dict

bad_tags = [] # Record the invalid tags for this analysis item
bad_tags = [] # Record the invalid tags for this analysis item
if reports := spec.get("Reports"):
if mitre := reports.get("MITRE ATT&CK"):
for mapping in mitre:
if not MITRE_PATTERN.match(mapping):
bad_tags.append(mapping)

if bad_tags:
items_with_invalid_mappings.append({
"rel_path": rel_path,
"bad_tags": bad_tags
})

items_with_invalid_mappings.append({"rel_path": rel_path, "bad_tags": bad_tags})

if items_with_invalid_mappings:
print("❌ Some items had invalid MITRE mapping formats:")
print()
Expand All @@ -42,16 +40,21 @@ def main(path: Path) -> bool:
print("\t" + bad_tag)
print()

print(("To ensure that your MITRE mappings are correctly displayed in the Panther "
"console, make sure your MITRE mappings are formatted like 'TA0000:T0000'."))
print(
(
"To ensure that your MITRE mappings are correctly displayed in the Panther "
"console, make sure your MITRE mappings are formatted like 'TA0000:T0000'."
)
)
else:
print("✅ No invalid MITRE mappings found! You're in the clear! 👍")

return bool(items_with_invalid_mappings)


if __name__ == "__main__":
path = Path.cwd() # Default to current directory
path = Path.cwd() # Default to current directory
if len(sys.argv) > 1:
path = Path(sys.argv[1])
if main(path):
exit(1) # Exit with error if issues were found
exit(1) # Exit with error if issues were found
23 changes: 10 additions & 13 deletions global_helpers/gcp_base_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get


def get_info(event):
fields = {
"principal": "protoPayload.authenticationInfo.principalEmail",
Expand All @@ -9,15 +6,15 @@ def get_info(event):
"user_agent": "protoPayload.requestMetadata.callerSuppliedUserAgent",
"method_name": "protoPayload.methodName",
}
return {name: deep_get(event, *(path.split("."))) for name, path in fields.items()}
return {name: event.deep_get(*(path.split("."))) for name, path in fields.items()}


def get_k8s_info(event):
"""
Get GCP K8s info such as pod, authorized user etc.
return a tuple of strings
"""
pod_slug = deep_get(event, "protoPayload", "resourceName")
pod_slug = event.deep_get("protoPayload", "resourceName")
# core/v1/namespaces/<namespace>/pods/<pod-id>/<action>
_, _, _, namespace, _, pod, _ = pod_slug.split("/")
return get_info(event) | {"namespace": namespace, "pod": pod}
Expand All @@ -33,17 +30,17 @@ def get_flow_log_info(event):
"bytes_sent": "jsonPayload.bytes_sent",
"reporter": "jsonPayload.reporter",
}
return {name: deep_get(event, *(path.split("."))) for name, path in fields.items()}
return {name: event.deep_get(*(path.split("."))) for name, path in fields.items()}


def gcp_alert_context(event):
return {
"project": deep_get(event, "resource", "labels", "project_id", default=""),
"principal": deep_get(
event, "protoPayload", "authenticationInfo", "principalEmail", default=""
"project": event.deep_get("resource", "labels", "project_id", default=""),
"principal": event.deep_get(
"protoPayload", "authenticationInfo", "principalEmail", default=""
),
"caller_ip": deep_get(event, "protoPayload", "requestMetadata", "callerIP", default=""),
"methodName": deep_get(event, "protoPayload", "methodName", default=""),
"resourceName": deep_get(event, "protoPayload", "resourceName", default=""),
"serviceName": deep_get(event, "protoPayload", "serviceName", default=""),
"caller_ip": event.deep_get("protoPayload", "requestMetadata", "callerIP", default=""),
"methodName": event.deep_get("protoPayload", "methodName", default=""),
"resourceName": event.deep_get("protoPayload", "resourceName", default=""),
"serviceName": event.deep_get("protoPayload", "serviceName", default=""),
}
5 changes: 1 addition & 4 deletions global_helpers/global_filter_auth0.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all Auth0 detections
Expand All @@ -20,7 +17,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
# # not all Auth0 enterprise events have org
# # example: request domain
# # if we don't know the request_domain, we want default behavior to be to alert on this event.
# request_domain = deep_get(event, "data", "details", "request", "channel", default="")
# request_domain = event.deep_get("data", "details", "request", "channel", default="")
# return request_domain in ["https://manage.auth0.com/", "https://mycompany.auth0.com", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_azuresignin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all AzureSignIn detections
Expand All @@ -17,7 +14,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # example: event['tenantId']
# # if tenantId were missing, we want default behavior to be to alert on this event.
# tenant_id = deep_get(event, "tenantId", default="")
# tenant_id = event.get("tenantId", "")
# return event_origin in ["333333eb-a222-33cc-9baf-4a1111111111", ""]
#
return True
3 changes: 0 additions & 3 deletions global_helpers/global_filter_cloudflare.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all cloudflare detections
Expand Down
5 changes: 1 addition & 4 deletions global_helpers/global_filter_github.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all github detections
Expand All @@ -19,7 +16,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # not all github enterprise events have org
# # example: enterprise.self_hosted_runner_online
# org = deep_get(event, "org", default="")
# org = event.get("org", "")
# return org in ["my-prod-org", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_notion.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all Notion detections
Expand All @@ -17,7 +14,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # example: workspace_id
# # if we don't know the workspace_id, we want default behavior to be to alert on this event.
# workspace_id = deep_get(event, "workspace_id", default="")
# workspace_id = event.get("workspace_id", "")
# return workspace_id in ["ea65b016-6abc-4dcf-808b-e000099999999", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_snyk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all snyk detections
Expand All @@ -16,7 +13,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # not all snyk audit events have orgId & projectId
# # example: group.user.add, sometimes api.access
# org = deep_get(event, "orgId", default="")
# org = event.get("orgId", "")
# return org in ["21111111-a222-4eee-8ddd-a99999999999", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_tailscale.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all Tailscale detections
Expand All @@ -17,7 +14,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # example: event.origin
# # if we don't know the event_origin, we want default behavior to be to alert on this event.
# event_origin = deep_get(event, "event", "origin", default="")
# event_origin = event.deep_get("event", "origin", default="")
# return event_origin in ["ADMIN_CONSOLE", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_tines.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all Tines detections
Expand All @@ -14,7 +11,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
# 1. the specific tenant_id mentioned.
# 2. events where tenant_id is undefined
#
# tenant_id = deep_get(event, "tenant_id", default="")
# tenant_id = event.get("tenant_id", "")
# return tenant_id in ["1234", ""]
#
return True
Loading

0 comments on commit 039ad9a

Please sign in to comment.