-
Notifications
You must be signed in to change notification settings - Fork 176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Helper reorg #1380
Helper reorg #1380
Changes from all commits
4a1980b
a31228a
bad7515
9518e9e
4bfc985
8788c62
67bb796
0dba3cb
9c3f91a
5c25f06
755bfc6
b0899ab
9580235
bc174b8
1936fed
b6434b3
bbb016c
f5d800d
8b16cf9
603d5fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,14 +19,16 @@ | |
|
||
import panther_asana_helpers as p_a_h # pylint: disable=C0413 | ||
import panther_auth0_helpers as p_auth0_h # pylint: disable=C0413 | ||
import panther_aws_helpers as p_aws_h # pylint: disable=C0413 | ||
import panther_azuresignin_helpers as p_asi_h # pylint: disable=C0413 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe |
||
import panther_base_helpers as p_b_h # pylint: disable=C0413 | ||
import panther_box_helpers as p_box_h # pylint: disable=C0413 | ||
import panther_cloudflare_helpers as p_cf_h # pylint: disable=C0413 | ||
import panther_crowdstrike_fdr_helpers as p_cf_fdr_h # pylint: disable=C0413 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe |
||
import panther_greynoise_helpers as p_greynoise_h # pylint: disable=C0413 | ||
import panther_ipinfo_helpers as p_i_h # pylint: disable=C0413 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
import panther_lookuptable_helpers as p_l_h # pylint: disable=C0413 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
import panther_notion_helpers as p_notion_h # pylint: disable=C0413 | ||
import panther_oss_helpers as p_o_h # pylint: disable=C0413 | ||
import panther_snyk_helpers as p_snyk_h # pylint: disable=C0413 | ||
import panther_tailscale_helpers as p_tscale_h # pylint: disable=C0413 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
import panther_tines_helpers as p_tines_h # pylint: disable=C0413 | ||
|
@@ -94,7 +96,7 @@ def setUp(self): | |
) | ||
|
||
def test_complete_event(self): | ||
response = p_b_h.eks_panther_obj_ref(self.event) | ||
response = p_aws_h.eks_panther_obj_ref(self.event) | ||
self.assertEqual(response.get("actor", ""), "kubernetes-admin") | ||
self.assertEqual(response.get("object", ""), "some-job-xxx1y") | ||
self.assertEqual(response.get("ns", ""), "default") | ||
|
@@ -112,7 +114,7 @@ def test_all_missing_event(self): | |
del temp_event["verb"] | ||
del temp_event["p_source_label"] | ||
temp_event = PantherEvent(temp_event) | ||
response = p_b_h.eks_panther_obj_ref(temp_event) | ||
response = p_aws_h.eks_panther_obj_ref(temp_event) | ||
self.assertEqual(response.get("actor", ""), "<NO_USERNAME>") | ||
self.assertEqual(response.get("object", ""), "<NO_OBJECT_NAME>") | ||
self.assertEqual(response.get("ns", ""), "<NO_OBJECT_NAMESPACE>") | ||
|
@@ -126,7 +128,7 @@ def test_missing_subresource_event(self): | |
temp_event = self.event.to_dict() | ||
del temp_event["objectRef"]["subresource"] | ||
temp_event = PantherEvent(temp_event) | ||
response = p_b_h.eks_panther_obj_ref(temp_event) | ||
response = p_aws_h.eks_panther_obj_ref(temp_event) | ||
self.assertEqual(response.get("resource", ""), "pods") | ||
|
||
|
||
|
@@ -168,37 +170,37 @@ def setUp(self): | |
|
||
def test_additional_details_string(self): | ||
event = ImmutableCaseInsensitiveDict({"additional_details": self.initial_str}) | ||
returns = p_b_h.box_parse_additional_details(event) | ||
returns = p_box_h.box_parse_additional_details(event) | ||
self.assertEqual(returns.get("t", 0), 10) | ||
|
||
# in the case of a byte array, we expect the empty dict | ||
def test_additional_details_bytes(self): | ||
event = ImmutableCaseInsensitiveDict({"additional_details": self.initial_bytes}) | ||
returns = p_b_h.box_parse_additional_details(event) | ||
returns = p_box_h.box_parse_additional_details(event) | ||
self.assertEqual(len(returns), 0) | ||
|
||
# In the case of a list ( not a string or bytes array ), expect un-altered return | ||
def test_additional_details_list(self): | ||
event = ImmutableCaseInsensitiveDict({"additional_details": self.initial_list}) | ||
returns = p_b_h.box_parse_additional_details(event) | ||
returns = p_box_h.box_parse_additional_details(event) | ||
self.assertEqual(len(returns), 4) | ||
|
||
# in the case of a dict or similar, we expect it to be returned un-altered | ||
def test_additional_details_dict(self): | ||
event = ImmutableCaseInsensitiveDict({"additional_details": self.initial_dict}) | ||
returns = p_b_h.box_parse_additional_details(event) | ||
returns = p_box_h.box_parse_additional_details(event) | ||
self.assertEqual(returns.get("t", 0), 10) | ||
|
||
# If it's a string with no json object to be decoded, we expect an empty dict back | ||
def test_additional_details_plain_str(self): | ||
event = ImmutableCaseInsensitiveDict({"additional_details": self.initial_str_no_json}) | ||
returns = p_b_h.box_parse_additional_details(event) | ||
returns = p_box_h.box_parse_additional_details(event) | ||
self.assertEqual(len(returns), 0) | ||
|
||
# If it's a string with a json list, we expect the list | ||
def test_additional_details_str_list_json(self): | ||
event = ImmutableCaseInsensitiveDict({"additional_details": self.initial_str_list_json}) | ||
returns = p_b_h.box_parse_additional_details(event) | ||
returns = p_box_h.box_parse_additional_details(event) | ||
self.assertEqual(len(returns), 4) | ||
|
||
|
||
|
@@ -1100,11 +1102,11 @@ def setUp(self): | |
) | ||
|
||
def test_is_different_with_fdr_event_type_provided(self): | ||
response = p_b_h.filter_crowdstrike_fdr_event_type(self.input, "SomethingElse") | ||
response = p_cf_fdr_h.filter_crowdstrike_fdr_event_type(self.input, "SomethingElse") | ||
self.assertEqual(response, True) | ||
|
||
def test_is_same_with_the_fdr_event_type_provided(self): | ||
response = p_b_h.filter_crowdstrike_fdr_event_type(self.input, "DnsRequest") | ||
response = p_cf_fdr_h.filter_crowdstrike_fdr_event_type(self.input, "DnsRequest") | ||
self.assertEqual(response, False) | ||
|
||
def test_is_entirely_different_type(self): | ||
|
@@ -1115,7 +1117,7 @@ def test_is_entirely_different_type(self): | |
"event": {"foo": "bar"}, | ||
} | ||
) | ||
response = p_b_h.filter_crowdstrike_fdr_event_type(self.input, "DnsRequest") | ||
response = p_cf_fdr_h.filter_crowdstrike_fdr_event_type(self.input, "DnsRequest") | ||
self.assertEqual(response, False) | ||
|
||
|
||
|
@@ -1131,30 +1133,30 @@ def setUp(self): | |
) | ||
|
||
def test_input_key_default_works(self): | ||
response = p_b_h.get_crowdstrike_field(self.input, "zee", default="hello") | ||
response = p_cf_fdr_h.get_crowdstrike_field(self.input, "zee", default="hello") | ||
self.assertEqual(response, "hello") | ||
|
||
def test_input_key_does_not_exist(self): | ||
response = p_b_h.get_crowdstrike_field(self.input, "zee") | ||
response = p_cf_fdr_h.get_crowdstrike_field(self.input, "zee") | ||
self.assertEqual(response, None) | ||
|
||
def test_input_key_exists(self): | ||
response = p_b_h.get_crowdstrike_field(self.input, "cid") | ||
response = p_cf_fdr_h.get_crowdstrike_field(self.input, "cid") | ||
self.assertEqual(response, "something") | ||
|
||
def test_input_key_can_be_found_in_event(self): | ||
response = p_b_h.get_crowdstrike_field(self.input, "foo") | ||
response = p_cf_fdr_h.get_crowdstrike_field(self.input, "foo") | ||
self.assertEqual(response, "bar") | ||
|
||
def test_input_key_can_be_found_in_unknown(self): | ||
response = p_b_h.get_crowdstrike_field(self.input, "field") | ||
response = p_cf_fdr_h.get_crowdstrike_field(self.input, "field") | ||
self.assertEqual(response, "is") | ||
|
||
def test_precedence(self): | ||
temp_event = self.input.to_dict() | ||
temp_event["event"]["field"] = "found" | ||
temp_event = PantherEvent(temp_event) | ||
response = p_b_h.get_crowdstrike_field(temp_event, "field") | ||
response = p_cf_fdr_h.get_crowdstrike_field(temp_event, "field") | ||
self.assertEqual(response, "found") | ||
|
||
|
||
|
@@ -1974,10 +1976,10 @@ def setUp(self): | |
) | ||
|
||
def test_distances(self): | ||
nyc_to_sfo = p_o_h.km_between_ipinfo_loc(self.loc_nyc, self.loc_sfo) | ||
nyc_to_athens = p_o_h.km_between_ipinfo_loc(self.loc_nyc, self.loc_athens) | ||
nyc_to_aukland = p_o_h.km_between_ipinfo_loc(self.loc_nyc, self.loc_aukland) | ||
aukland_to_nyc = p_o_h.km_between_ipinfo_loc(self.loc_aukland, self.loc_nyc) | ||
nyc_to_sfo = p_i_h.km_between_ipinfo_loc(self.loc_nyc, self.loc_sfo) | ||
nyc_to_athens = p_i_h.km_between_ipinfo_loc(self.loc_nyc, self.loc_athens) | ||
nyc_to_aukland = p_i_h.km_between_ipinfo_loc(self.loc_nyc, self.loc_aukland) | ||
aukland_to_nyc = p_i_h.km_between_ipinfo_loc(self.loc_aukland, self.loc_nyc) | ||
# I used https://www.nhc.noaa.gov/gccalc.shtml to get test comparison distances | ||
# | ||
# delta is set to 0.5% of total computed distanc from gccalc | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, the
disable=C0413
is all over the place. I can address it after you merge this PR 😄