Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

THREAT-397 Reformat deep_get(event to event.deep_get( #1374

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions .scripts/mitre_mapping_check.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file was reformatted by black

Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,26 @@
# All MITRE Tags must match this regex pattern
MITRE_PATTERN = re.compile("^TA\d+\:T\d+(\.\d+)?$")


def main(path: Path) -> bool:
# Load Repo
analysis_items = load_analysis_specs([path], ignore_files=[])

items_with_invalid_mappings = [] # Record all items with bad tags
items_with_invalid_mappings = [] # Record all items with bad tags
for analysis_item in analysis_items:
rel_path = analysis_item[0] # Relative path to YAML file
spec = analysis_item[2] # YAML spec as a dict
rel_path = analysis_item[0] # Relative path to YAML file
spec = analysis_item[2] # YAML spec as a dict

bad_tags = [] # Record the invalid tags for this analysis item
bad_tags = [] # Record the invalid tags for this analysis item
if reports := spec.get("Reports"):
if mitre := reports.get("MITRE ATT&CK"):
for mapping in mitre:
if not MITRE_PATTERN.match(mapping):
bad_tags.append(mapping)

if bad_tags:
items_with_invalid_mappings.append({
"rel_path": rel_path,
"bad_tags": bad_tags
})

items_with_invalid_mappings.append({"rel_path": rel_path, "bad_tags": bad_tags})

if items_with_invalid_mappings:
print("❌ Some items had invalid MITRE mapping formats:")
print()
Expand All @@ -42,16 +40,21 @@ def main(path: Path) -> bool:
print("\t" + bad_tag)
print()

print(("To ensure that your MITRE mappings are correctly displayed in the Panther "
"console, make sure your MITRE mappings are formatted like 'TA0000:T0000'."))
print(
(
"To ensure that your MITRE mappings are correctly displayed in the Panther "
"console, make sure your MITRE mappings are formatted like 'TA0000:T0000'."
)
)
else:
print("✅ No invalid MITRE mappings found! You're in the clear! 👍")

return bool(items_with_invalid_mappings)


if __name__ == "__main__":
path = Path.cwd() # Default to current directory
path = Path.cwd() # Default to current directory
if len(sys.argv) > 1:
path = Path(sys.argv[1])
if main(path):
exit(1) # Exit with error if issues were found
exit(1) # Exit with error if issues were found
23 changes: 10 additions & 13 deletions global_helpers/gcp_base_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get


def get_info(event):
fields = {
"principal": "protoPayload.authenticationInfo.principalEmail",
Expand All @@ -9,15 +6,15 @@ def get_info(event):
"user_agent": "protoPayload.requestMetadata.callerSuppliedUserAgent",
"method_name": "protoPayload.methodName",
}
return {name: deep_get(event, *(path.split("."))) for name, path in fields.items()}
return {name: event.deep_get(*(path.split("."))) for name, path in fields.items()}


def get_k8s_info(event):
"""
Get GCP K8s info such as pod, authorized user etc.
return a tuple of strings
"""
pod_slug = deep_get(event, "protoPayload", "resourceName")
pod_slug = event.deep_get("protoPayload", "resourceName")
# core/v1/namespaces/<namespace>/pods/<pod-id>/<action>
_, _, _, namespace, _, pod, _ = pod_slug.split("/")
return get_info(event) | {"namespace": namespace, "pod": pod}
Expand All @@ -33,17 +30,17 @@ def get_flow_log_info(event):
"bytes_sent": "jsonPayload.bytes_sent",
"reporter": "jsonPayload.reporter",
}
return {name: deep_get(event, *(path.split("."))) for name, path in fields.items()}
return {name: event.deep_get(*(path.split("."))) for name, path in fields.items()}


def gcp_alert_context(event):
return {
"project": deep_get(event, "resource", "labels", "project_id", default=""),
"principal": deep_get(
event, "protoPayload", "authenticationInfo", "principalEmail", default=""
"project": event.deep_get("resource", "labels", "project_id", default=""),
"principal": event.deep_get(
"protoPayload", "authenticationInfo", "principalEmail", default=""
),
"caller_ip": deep_get(event, "protoPayload", "requestMetadata", "callerIP", default=""),
"methodName": deep_get(event, "protoPayload", "methodName", default=""),
"resourceName": deep_get(event, "protoPayload", "resourceName", default=""),
"serviceName": deep_get(event, "protoPayload", "serviceName", default=""),
"caller_ip": event.deep_get("protoPayload", "requestMetadata", "callerIP", default=""),
"methodName": event.deep_get("protoPayload", "methodName", default=""),
"resourceName": event.deep_get("protoPayload", "resourceName", default=""),
"serviceName": event.deep_get("protoPayload", "serviceName", default=""),
}
5 changes: 1 addition & 4 deletions global_helpers/global_filter_auth0.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all Auth0 detections
Expand All @@ -20,7 +17,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
# # not all Auth0 enterprise events have org
# # example: request domain
# # if we don't know the request_domain, we want default behavior to be to alert on this event.
# request_domain = deep_get(event, "data", "details", "request", "channel", default="")
# request_domain = event.deep_get("data", "details", "request", "channel", default="")
# return request_domain in ["https://manage.auth0.com/", "https://mycompany.auth0.com", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_azuresignin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all AzureSignIn detections
Expand All @@ -17,7 +14,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # example: event['tenantId']
# # if tenantId were missing, we want default behavior to be to alert on this event.
# tenant_id = deep_get(event, "tenantId", default="")
# tenant_id = event.deep_get("tenantId", default="")
# return event_origin in ["333333eb-a222-33cc-9baf-4a1111111111", ""]
#
return True
3 changes: 0 additions & 3 deletions global_helpers/global_filter_cloudflare.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all cloudflare detections
Expand Down
5 changes: 1 addition & 4 deletions global_helpers/global_filter_github.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all github detections
Expand All @@ -19,7 +16,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # not all github enterprise events have org
# # example: enterprise.self_hosted_runner_online
# org = deep_get(event, "org", default="")
# org = event.deep_get("org", default="")
# return org in ["my-prod-org", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_notion.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all Notion detections
Expand All @@ -17,7 +14,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # example: workspace_id
# # if we don't know the workspace_id, we want default behavior to be to alert on this event.
# workspace_id = deep_get(event, "workspace_id", default="")
# workspace_id = event.deep_get("workspace_id", default="")
# return workspace_id in ["ea65b016-6abc-4dcf-808b-e000099999999", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_snyk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all snyk detections
Expand All @@ -16,7 +13,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # not all snyk audit events have orgId & projectId
# # example: group.user.add, sometimes api.access
# org = deep_get(event, "orgId", default="")
# org = event.deep_get("orgId", default="")
# return org in ["21111111-a222-4eee-8ddd-a99999999999", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_tailscale.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all Tailscale detections
Expand All @@ -17,7 +14,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
#
# # example: event.origin
# # if we don't know the event_origin, we want default behavior to be to alert on this event.
# event_origin = deep_get(event, "event", "origin", default="")
# event_origin = event.deep_get("event", "origin", default="")
# return event_origin in ["ADMIN_CONSOLE", ""]
#
return True
5 changes: 1 addition & 4 deletions global_helpers/global_filter_tines.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from panther_base_helpers import deep_get # pylint: disable=unused-import


def filter_include_event(event) -> bool: # pylint: disable=unused-argument
"""
filter_include_event provides a global include filter for all Tines detections
Expand All @@ -14,7 +11,7 @@ def filter_include_event(event) -> bool: # pylint: disable=unused-argument
# 1. the specific tenant_id mentioned.
# 2. events where tenant_id is undefined
#
# tenant_id = deep_get(event, "tenant_id", default="")
# tenant_id = event.deep_get("tenant_id", default="")
# return tenant_id in ["1234", ""]
#
return True
21 changes: 9 additions & 12 deletions global_helpers/panther_asana_helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from panther_base_helpers import deep_get


def asana_alert_context(event: dict) -> dict:
def asana_alert_context(event) -> dict:
a_c = {
"actor": "<NO_ACTOR>",
"context": "<NO_CONTEXT>",
Expand All @@ -10,23 +7,23 @@ def asana_alert_context(event: dict) -> dict:
"resource_name": "<NO_RESOURCE_NAME>",
"resource_gid": "<NO_RESOURCE_GID>",
}
if deep_get(event, "actor", "actor_type", default="") == "user":
a_c["actor"] = deep_get(event, "actor", "email", default="<NO_ACTOR_EMAIL>")
if event.deep_get("actor", "actor_type", default="") == "user":
a_c["actor"] = event.deep_get("actor", "email", default="<NO_ACTOR_EMAIL>")
else:
a_c["actor"] = deep_get(event, "actor", "actor_type", default="<NO_ACTOR>")
a_c["actor"] = event.deep_get("actor", "actor_type", default="<NO_ACTOR>")
if "event_type" in event:
# Events have categories and event_type
# We have not seen category overlap -> only including event_type
a_c["event_type"] = event.get("event_type")
a_c["resource_name"] = deep_get(event, "resource", "name", default="<NO_RESOURCE_NAME>")
a_c["resource_gid"] = deep_get(event, "resource", "gid", default="<NO_RESOURCE_GID>")
r_t = deep_get(event, "resource", "resource_type")
a_c["resource_name"] = event.deep_get("resource", "name", default="<NO_RESOURCE_NAME>")
a_c["resource_gid"] = event.deep_get("resource", "gid", default="<NO_RESOURCE_GID>")
r_t = event.deep_get("resource", "resource_type")
if r_t:
a_c["resource_type"] = r_t
r_s_t = deep_get(event, "resource", "resource_subtype")
r_s_t = event.deep_get("resource", "resource_subtype")
if r_t and r_s_t and r_s_t != r_t:
a_c["resource_type"] += "__" + r_s_t
ctx = deep_get(event, "context", "context_type")
ctx = event.deep_get("context", "context_type")
if ctx:
a_c["context"] = ctx
return a_c
11 changes: 4 additions & 7 deletions global_helpers/panther_auth0_helpers.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
from panther_base_helpers import deep_get


def auth0_alert_context(event) -> dict:
a_c = {}
a_c["actor"] = deep_get(
event, "data", "details", "request", "auth", "user", default="<NO_ACTOR_FOUND>"
a_c["actor"] = event.deep_get(
"data", "details", "request", "auth", "user", default="<NO_ACTOR_FOUND>"
)
a_c["action"] = deep_get(event, "data", "description", default="<NO_ACTION_FOUND>")
a_c["action"] = event.deep_get("data", "description", default="<NO_ACTION_FOUND>")
return a_c


def is_auth0_config_event(event):
channel = deep_get(event, "data", "details", "request", "channel", default="")
channel = event.deep_get("data", "details", "request", "channel", default="")
return channel == "https://manage.auth0.com/"
21 changes: 9 additions & 12 deletions global_helpers/panther_azuresignin_helpers.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
from panther_base_helpers import deep_get


def actor_user(event):
category = deep_get(event, "category", default="")
category = event.deep_get("category", default="")
if category in {"ServicePrincipalSignInLogs"}:
return deep_get(event, "properties", "servicePrincipalName")
return event.deep_get("properties", "servicePrincipalName")
if category in {"SignInLogs", "NonInteractiveUserSignInLogs"}:
return deep_get(event, "properties", "userPrincipalName")
return event.deep_get("properties", "userPrincipalName")
return None


def is_sign_in_event(event):
return deep_get(event, "operationName", default="") == "Sign-in activity"
return event.deep_get("operationName", default="") == "Sign-in activity"


def azure_signin_alert_context(event) -> dict:
ac_actor_user = actor_user(event)
if ac_actor_user is None:
ac_actor_user = "<NO_ACTORUSER>"
a_c = {}
a_c["tenantId"] = deep_get(event, "tenantId", default="<NO_TENANTID>")
a_c["source_ip"] = deep_get(event, "properties", "ipAddress", default="<NO_SOURCEIP>")
a_c["tenantId"] = event.deep_get("tenantId", default="<NO_TENANTID>")
a_c["source_ip"] = event.deep_get("properties", "ipAddress", default="<NO_SOURCEIP>")
a_c["actor_user"] = ac_actor_user
a_c["resourceDisplayName"] = deep_get(
event, "properties", "resourceDisplayName", default="<NO_RESOURCEDISPLAYNAME>"
a_c["resourceDisplayName"] = event.deep_get(
"properties", "resourceDisplayName", default="<NO_RESOURCEDISPLAYNAME>"
)
a_c["resourceId"] = deep_get(event, "properties", "resourceId", default="<NO_RESOURCEID>")
a_c["resourceId"] = event.deep_get("properties", "resourceId", default="<NO_RESOURCEID>")
return a_c
32 changes: 16 additions & 16 deletions global_helpers/panther_base_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,27 +269,27 @@ def filter_crowdstrike_fdr_event_type(event, name: str) -> bool:

def get_crowdstrike_field(event, field_name, default=None):
return (
deep_get(event, field_name)
or deep_get(event, "event", field_name)
or deep_get(event, "unknown_payload", field_name)
event.deep_get(field_name)
or event.deep_get("event", field_name)
or event.deep_get("unknown_payload", field_name)
or default
)


def slack_alert_context(event: dict):
def slack_alert_context(event):
return {
"actor-name": deep_get(event, "actor", "user", "name", default="<MISSING_NAME>"),
"actor-email": deep_get(event, "actor", "user", "email", default="<MISSING_EMAIL>"),
"actor-ip": deep_get(event, "context", "ip_address", default="<MISSING_IP>"),
"user-agent": deep_get(event, "context", "ua", default="<MISSING_UA>"),
"actor-name": event.deep_get("actor", "user", "name", default="<MISSING_NAME>"),
"actor-email": event.deep_get("actor", "user", "email", default="<MISSING_EMAIL>"),
"actor-ip": event.deep_get("context", "ip_address", default="<MISSING_IP>"),
"user-agent": event.deep_get("context", "ua", default="<MISSING_UA>"),
}


def github_alert_context(event: dict):
def github_alert_context(event):
return {
"action": event.get("action", ""),
"actor": event.get("actor", ""),
"actor_location": deep_get(event, "actor_location", "country_code"),
"actor_location": event.deep_get("actor_location", "country_code"),
"org": event.get("org", ""),
"repo": event.get("repo", ""),
"user": event.get("user", ""),
Expand Down Expand Up @@ -412,14 +412,14 @@ def aws_guardduty_context(event: dict):
}


def eks_panther_obj_ref(event: dict):
user = deep_get(event, "user", "username", default="<NO_USERNAME>")
def eks_panther_obj_ref(event):
user = event.deep_get("user", "username", default="<NO_USERNAME>")
source_ips = event.get("sourceIPs", ["0.0.0.0"]) # nosec
verb = event.get("verb", "<NO_VERB>")
obj_name = deep_get(event, "objectRef", "name", default="<NO_OBJECT_NAME>")
obj_ns = deep_get(event, "objectRef", "namespace", default="<NO_OBJECT_NAMESPACE>")
obj_res = deep_get(event, "objectRef", "resource", default="<NO_OBJECT_RESOURCE>")
obj_subres = deep_get(event, "objectRef", "subresource", default="")
obj_name = event.deep_get("objectRef", "name", default="<NO_OBJECT_NAME>")
obj_ns = event.deep_get("objectRef", "namespace", default="<NO_OBJECT_NAMESPACE>")
obj_res = event.deep_get("objectRef", "resource", default="<NO_OBJECT_RESOURCE>")
obj_subres = event.deep_get("objectRef", "subresource", default="")
p_source_label = event.get("p_source_label", "<NO_P_SOURCE_LABEL>")
if obj_subres:
obj_res = "/".join([obj_res, obj_subres])
Expand Down
Loading
Loading