Skip to content

Commit

Permalink
split Standard.BruteForceByIp into IP and Username versions
Browse files Browse the repository at this point in the history
  • Loading branch information
ben-githubs committed Dec 19, 2024
1 parent 0256e82 commit b1694e9
Show file tree
Hide file tree
Showing 4 changed files with 496 additions and 1 deletion.
1 change: 1 addition & 0 deletions packs/standard_ruleset.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ PackDefinition:
# Standard Detections
- Standard.AdminRoleAssigned
- Standard.BruteForceByIP
- Standard.BruteForceByUser
- Standard.ImpossibleTravel.Login
- Standard.MFADisabled
- Standard.NewAWSAccountCreated
Expand Down
3 changes: 2 additions & 1 deletion rules/standard_rules/brute_force_by_ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ def title(event):
# use unified data model field in title
log_type = event.get("p_log_type")
title_str = (
f"{log_type}: User [{event.udm('actor_user')}] has exceeded the failed logins threshold"
f"{log_type}: Login attempts from IP [{event.udm('source_ip')}] "
"have exceeded the failed logins threshold"
)
if log_type == "AWS.CloudTrail":
title_str += f" in [{lookup_aws_account_name(event.get('recipientAccountId'))}]"
Expand Down
45 changes: 45 additions & 0 deletions rules/standard_rules/brute_force_by_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from json import loads

import panther_event_type_helpers as event_type
from panther_aws_helpers import lookup_aws_account_name
from panther_base_helpers import add_parse_delay
from panther_ipinfo_helpers import PantherIPInfoException, geoinfo_from_ip


def rule(event):
# filter events on unified data model field
return event.udm("event_type") == event_type.FAILED_LOGIN


def title(event):
# use unified data model field in title
log_type = event.get("p_log_type")
title_str = (
f"{log_type}: User [{event.udm('actor_user')}] has exceeded the failed logins threshold"
)
if log_type == "AWS.CloudTrail":
title_str += f" in [{lookup_aws_account_name(event.get('recipientAccountId'))}]"
return title_str


def alert_context(event):
try:
geoinfo = geoinfo_from_ip(event=event, match_field=event.udm_path("source_ip"))
except PantherIPInfoException:
geoinfo = {}
if isinstance(geoinfo, str):
geoinfo = loads(geoinfo)
context = {}
context["geolocation"] = (
f"{geoinfo.get('city')}, {geoinfo.get('region')} in " f"{geoinfo.get('country')}"
)
context["ip"] = geoinfo.get("ip")
context["reverse_lookup"] = geoinfo.get("hostname", "No reverse lookup hostname")
context["ip_org"] = geoinfo.get("org", "No organization listed")
try:
context = add_parse_delay(event, context)
except TypeError:
pass
except AttributeError:
pass
return context
Loading

0 comments on commit b1694e9

Please sign in to comment.