Skip to content

Commit

Permalink
THREAT-397 Reformat deep_get(event to event.deep_get(
Browse files Browse the repository at this point in the history
  • Loading branch information
akozlovets098 committed Oct 2, 2024
1 parent 7a2bec6 commit 1cbcd51
Show file tree
Hide file tree
Showing 260 changed files with 1,086 additions and 1,461 deletions.
33 changes: 18 additions & 15 deletions .scripts/mitre_mapping_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,26 @@
# All MITRE Tags must match this regex pattern
MITRE_PATTERN = re.compile("^TA\d+\:T\d+(\.\d+)?$")


def main(path: Path) -> bool:
# Load Repo
analysis_items = load_analysis_specs([path], ignore_files=[])

items_with_invalid_mappings = [] # Record all items with bad tags
items_with_invalid_mappings = [] # Record all items with bad tags
for analysis_item in analysis_items:
rel_path = analysis_item[0] # Relative path to YAML file
spec = analysis_item[2] # YAML spec as a dict
rel_path = analysis_item[0] # Relative path to YAML file
spec = analysis_item[2] # YAML spec as a dict

bad_tags = [] # Record the invalid tags for this analysis item
bad_tags = [] # Record the invalid tags for this analysis item
if reports := spec.get("Reports"):
if mitre := reports.get("MITRE ATT&CK"):
for mapping in mitre:
if not MITRE_PATTERN.match(mapping):
bad_tags.append(mapping)

if bad_tags:
items_with_invalid_mappings.append({
"rel_path": rel_path,
"bad_tags": bad_tags
})

items_with_invalid_mappings.append({"rel_path": rel_path, "bad_tags": bad_tags})

if items_with_invalid_mappings:
print("❌ Some items had invalid MITRE mapping formats:")
print()
Expand All @@ -42,16 +40,21 @@ def main(path: Path) -> bool:
print("\t" + bad_tag)
print()

print(("To ensure that your MITRE mappings are correctly displayed in the Panther "
"console, make sure your MITRE mappings are formatted like 'TA0000:T0000'."))
print(
(
"To ensure that your MITRE mappings are correctly displayed in the Panther "
"console, make sure your MITRE mappings are formatted like 'TA0000:T0000'."
)
)
else:
print("✅ No invalid MITRE mappings found! You're in the clear! 👍")

return bool(items_with_invalid_mappings)


if __name__ == "__main__":
path = Path.cwd() # Default to current directory
path = Path.cwd() # Default to current directory
if len(sys.argv) > 1:
path = Path(sys.argv[1])
if main(path):
exit(1) # Exit with error if issues were found
exit(1) # Exit with error if issues were found
23 changes: 10 additions & 13 deletions global_helpers/gcp_base_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get


def get_info(event):
fields = {
"principal": "protoPayload.authenticationInfo.principalEmail",
Expand All @@ -9,15 +6,15 @@ def get_info(event):
"user_agent": "protoPayload.requestMetadata.callerSuppliedUserAgent",
"method_name": "protoPayload.methodName",
}
return {name: deep_get(event, *(path.split("."))) for name, path in fields.items()}
return {name: event.deep_get(*(path.split("."))) for name, path in fields.items()}


def get_k8s_info(event):
"""
Get GCP K8s info such as pod, authorized user etc.
return a tuple of strings
"""
pod_slug = deep_get(event, "protoPayload", "resourceName")
pod_slug = event.deep_get("protoPayload", "resourceName")
# core/v1/namespaces/<namespace>/pods/<pod-id>/<action>
_, _, _, namespace, _, pod, _ = pod_slug.split("/")
return get_info(event) | {"namespace": namespace, "pod": pod}
Expand All @@ -33,17 +30,17 @@ def get_flow_log_info(event):
"bytes_sent": "jsonPayload.bytes_sent",
"reporter": "jsonPayload.reporter",
}
return {name: deep_get(event, *(path.split("."))) for name, path in fields.items()}
return {name: event.deep_get(*(path.split("."))) for name, path in fields.items()}


def gcp_alert_context(event):
return {
"project": deep_get(event, "resource", "labels", "project_id", default=""),
"principal": deep_get(
event, "protoPayload", "authenticationInfo", "principalEmail", default=""
"project": event.deep_get("resource", "labels", "project_id", default=""),
"principal": event.deep_get(
"protoPayload", "authenticationInfo", "principalEmail", default=""
),
"caller_ip": deep_get(event, "protoPayload", "requestMetadata", "callerIP", default=""),
"methodName": deep_get(event, "protoPayload", "methodName", default=""),
"resourceName": deep_get(event, "protoPayload", "resourceName", default=""),
"serviceName": deep_get(event, "protoPayload", "serviceName", default=""),
"caller_ip": event.deep_get("protoPayload", "requestMetadata", "callerIP", default=""),
"methodName": event.deep_get("protoPayload", "methodName", default=""),
"resourceName": event.deep_get("protoPayload", "resourceName", default=""),
"serviceName": event.deep_get("protoPayload", "serviceName", default=""),
}
5 changes: 1 addition & 4 deletions global_helpers/global_filter_auth0.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all Auth0 detections
Expand All @@ -20,7 +17,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
# # not all Auth0 enterprise events have org
# # example: request domain
# # if we don't know the request_domain, we want default behavior to be to alert on this event.
# request_domain = deep_get(event, "data", "details", "request", "channel", default="")
# request_domain = event.deep_get("data", "details", "request", "channel", default="")
# return request_domain in ["https://manage.auth0.com/", "https://mycompany.auth0.com", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_azuresignin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all AzureSignIn detections
Expand All @@ -17,7 +14,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # example: event['tenantId']
# # if tenantId were missing, we want default behavior to be to alert on this event.
# tenant_id = deep_get(event, "tenantId", default="")
# tenant_id = event.deep_get("tenantId", default="")
# return event_origin in ["333333eb-a222-33cc-9baf-4a1111111111", ""]
#
return True
3 changes: 0 additions & 3 deletions global_helpers/global_filter_cloudflare.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all cloudflare detections
Expand Down
5 changes: 1 addition & 4 deletions global_helpers/global_filter_github.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all github detections
Expand All @@ -19,7 +16,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # not all github enterprise events have org
# # example: enterprise.self_hosted_runner_online
# org = deep_get(event, "org", default="")
# org = event.deep_get("org", default="")
# return org in ["my-prod-org", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_notion.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all Notion detections
Expand All @@ -17,7 +14,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # example: workspace_id
# # if we don't know the workspace_id, we want default behavior to be to alert on this event.
# workspace_id = deep_get(event, "workspace_id", default="")
# workspace_id = event.deep_get("workspace_id", default="")
# return workspace_id in ["ea65b016-6abc-4dcf-808b-e000099999999", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_snyk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all snyk detections
Expand All @@ -16,7 +13,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # not all snyk audit events have orgId & projectId
# # example: group.user.add, sometimes api.access
# org = deep_get(event, "orgId", default="")
# org = event.deep_get("orgId", default="")
# return org in ["21111111-a222-4eee-8ddd-a99999999999", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_tailscale.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all Tailscale detections
Expand All @@ -17,7 +14,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # example: event.origin
# # if we don't know the event_origin, we want default behavior to be to alert on this event.
# event_origin = deep_get(event, "event", "origin", default="")
# event_origin = event.deep_get("event", "origin", default="")
# return event_origin in ["ADMIN_CONSOLE", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_tines.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all Tines detections
Expand All @@ -14,7 +11,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
# 1. the specific tenant_id mentioned.
# 2. events where tenant_id is undefined
#
# tenant_id = deep_get(event, "tenant_id", default="")
# tenant_id = event.deep_get("tenant_id", default="")
# return tenant_id in ["1234", ""]
#
return True
21 changes: 9 additions & 12 deletions global_helpers/panther_asana_helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from panther_base_helpers import deep_get


def asana_alert_context(event: dict) -> dict:
def asana_alert_context(event) -> dict:
a_c = {
"actor": "<NO_ACTOR>",
"context": "<NO_CONTEXT>",
Expand All @@ -10,23 +7,23 @@ def asana_alert_context(event: dict) -> dict:
"resource_name": "<NO_RESOURCE_NAME>",
"resource_gid": "<NO_RESOURCE_GID>",
}
if deep_get(event, "actor", "actor_type", default="") == "user":
a_c["actor"] = deep_get(event, "actor", "email", default="<NO_ACTOR_EMAIL>")
if event.deep_get("actor", "actor_type", default="") == "user":
a_c["actor"] = event.deep_get("actor", "email", default="<NO_ACTOR_EMAIL>")
else:
a_c["actor"] = deep_get(event, "actor", "actor_type", default="<NO_ACTOR>")
a_c["actor"] = event.deep_get("actor", "actor_type", default="<NO_ACTOR>")
if "event_type" in event:
# Events have categories and event_type
# We have not seen category overlap -> only including event_type
a_c["event_type"] = event.get("event_type")
a_c["resource_name"] = deep_get(event, "resource", "name", default="<NO_RESOURCE_NAME>")
a_c["resource_gid"] = deep_get(event, "resource", "gid", default="<NO_RESOURCE_GID>")
r_t = deep_get(event, "resource", "resource_type")
a_c["resource_name"] = event.deep_get("resource", "name", default="<NO_RESOURCE_NAME>")
a_c["resource_gid"] = event.deep_get("resource", "gid", default="<NO_RESOURCE_GID>")
r_t = event.deep_get("resource", "resource_type")
if r_t:
a_c["resource_type"] = r_t
r_s_t = deep_get(event, "resource", "resource_subtype")
r_s_t = event.deep_get("resource", "resource_subtype")
if r_t and r_s_t and r_s_t != r_t:
a_c["resource_type"] += "__" + r_s_t
ctx = deep_get(event, "context", "context_type")
ctx = event.deep_get("context", "context_type")
if ctx:
a_c["context"] = ctx
return a_c
11 changes: 4 additions & 7 deletions global_helpers/panther_auth0_helpers.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
from panther_base_helpers import deep_get


def auth0_alert_context(event) -> dict:
a_c = {}
a_c["actor"] = deep_get(
event, "data", "details", "request", "auth", "user", default="<NO_ACTOR_FOUND>"
a_c["actor"] = event.deep_get(
"data", "details", "request", "auth", "user", default="<NO_ACTOR_FOUND>"
)
a_c["action"] = deep_get(event, "data", "description", default="<NO_ACTION_FOUND>")
a_c["action"] = event.deep_get("data", "description", default="<NO_ACTION_FOUND>")
return a_c


def is_auth0_config_event(event):
channel = deep_get(event, "data", "details", "request", "channel", default="")
channel = event.deep_get("data", "details", "request", "channel", default="")
return channel == "https://manage.auth0.com/"
21 changes: 9 additions & 12 deletions global_helpers/panther_azuresignin_helpers.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
from panther_base_helpers import deep_get


def actor_user(event):
category = deep_get(event, "category", default="")
category = event.deep_get("category", default="")
if category in {"ServicePrincipalSignInLogs"}:
return deep_get(event, "properties", "servicePrincipalName")
return event.deep_get("properties", "servicePrincipalName")
if category in {"SignInLogs", "NonInteractiveUserSignInLogs"}:
return deep_get(event, "properties", "userPrincipalName")
return event.deep_get("properties", "userPrincipalName")
return None


def is_sign_in_event(event):
return deep_get(event, "operationName", default="") == "Sign-in activity"
return event.deep_get("operationName", default="") == "Sign-in activity"


def azure_signin_alert_context(event) -> dict:
ac_actor_user = actor_user(event)
if ac_actor_user is None:
ac_actor_user = "<NO_ACTORUSER>"
a_c = {}
a_c["tenantId"] = deep_get(event, "tenantId", default="<NO_TENANTID>")
a_c["source_ip"] = deep_get(event, "properties", "ipAddress", default="<NO_SOURCEIP>")
a_c["tenantId"] = event.deep_get("tenantId", default="<NO_TENANTID>")
a_c["source_ip"] = event.deep_get("properties", "ipAddress", default="<NO_SOURCEIP>")
a_c["actor_user"] = ac_actor_user
a_c["resourceDisplayName"] = deep_get(
event, "properties", "resourceDisplayName", default="<NO_RESOURCEDISPLAYNAME>"
a_c["resourceDisplayName"] = event.deep_get(
"properties", "resourceDisplayName", default="<NO_RESOURCEDISPLAYNAME>"
)
a_c["resourceId"] = deep_get(event, "properties", "resourceId", default="<NO_RESOURCEID>")
a_c["resourceId"] = event.deep_get("properties", "resourceId", default="<NO_RESOURCEID>")
return a_c
32 changes: 16 additions & 16 deletions global_helpers/panther_base_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,27 +269,27 @@ def filter_crowdstrike_fdr_event_type(event, name: str) -> bool:

def get_crowdstrike_field(event, field_name, default=None):
return (
deep_get(event, field_name)
or deep_get(event, "event", field_name)
or deep_get(event, "unknown_payload", field_name)
event.deep_get(field_name)
or event.deep_get("event", field_name)
or event.deep_get("unknown_payload", field_name)
or default
)


def slack_alert_context(event: dict):
def slack_alert_context(event):
return {
"actor-name": deep_get(event, "actor", "user", "name", default="<MISSING_NAME>"),
"actor-email": deep_get(event, "actor", "user", "email", default="<MISSING_EMAIL>"),
"actor-ip": deep_get(event, "context", "ip_address", default="<MISSING_IP>"),
"user-agent": deep_get(event, "context", "ua", default="<MISSING_UA>"),
"actor-name": event.deep_get("actor", "user", "name", default="<MISSING_NAME>"),
"actor-email": event.deep_get("actor", "user", "email", default="<MISSING_EMAIL>"),
"actor-ip": event.deep_get("context", "ip_address", default="<MISSING_IP>"),
"user-agent": event.deep_get("context", "ua", default="<MISSING_UA>"),
}


def github_alert_context(event: dict):
def github_alert_context(event):
return {
"action": event.get("action", ""),
"actor": event.get("actor", ""),
"actor_location": deep_get(event, "actor_location", "country_code"),
"actor_location": event.deep_get("actor_location", "country_code"),
"org": event.get("org", ""),
"repo": event.get("repo", ""),
"user": event.get("user", ""),
Expand Down Expand Up @@ -412,14 +412,14 @@ def aws_guardduty_context(event: dict):
}


def eks_panther_obj_ref(event: dict):
user = deep_get(event, "user", "username", default="<NO_USERNAME>")
def eks_panther_obj_ref(event):
user = event.deep_get("user", "username", default="<NO_USERNAME>")
source_ips = event.get("sourceIPs", ["0.0.0.0"]) # nosec
verb = event.get("verb", "<NO_VERB>")
obj_name = deep_get(event, "objectRef", "name", default="<NO_OBJECT_NAME>")
obj_ns = deep_get(event, "objectRef", "namespace", default="<NO_OBJECT_NAMESPACE>")
obj_res = deep_get(event, "objectRef", "resource", default="<NO_OBJECT_RESOURCE>")
obj_subres = deep_get(event, "objectRef", "subresource", default="")
obj_name = event.deep_get("objectRef", "name", default="<NO_OBJECT_NAME>")
obj_ns = event.deep_get("objectRef", "namespace", default="<NO_OBJECT_NAMESPACE>")
obj_res = event.deep_get("objectRef", "resource", default="<NO_OBJECT_RESOURCE>")
obj_subres = event.deep_get("objectRef", "subresource", default="")
p_source_label = event.get("p_source_label", "<NO_P_SOURCE_LABEL>")
if obj_subres:
obj_res = "/".join([obj_res, obj_subres])
Expand Down
Loading

0 comments on commit 1cbcd51

Please sign in to comment.