diff --git a/Makefile b/Makefile index a666eb027..9c91ad3ef 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,9 @@ deps-update: global-helpers-unit-test: pipenv run python -m unittest global_helpers/*_test.py +data-models-unit-test: + pipenv run python -m unittest data_models/*_test.py + lint: lint-pylint lint-fmt lint-pylint: @@ -59,7 +62,7 @@ fmt: install: pipenv sync --dev -test: global-helpers-unit-test +test: global-helpers-unit-test data-models-unit-test pipenv run panther_analysis_tool test $(TEST_ARGS) check-deprecated: diff --git a/data_models/aws_cloudtrail_data_model.py b/data_models/aws_cloudtrail_data_model.py index 20ad3e3a7..2715b871b 100644 --- a/data_models/aws_cloudtrail_data_model.py +++ b/data_models/aws_cloudtrail_data_model.py @@ -43,9 +43,7 @@ def load_ip_address(event): # https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-event-reference-user-identity.html#cloudtrail-event-reference-user-identity-fields def get_actor_user(event): user_type = deep_get(event, "userIdentity", "type") - if event.get("eventType") == "AwsServiceEvent": - actor_user = deep_get(event, "userIdentity", "invokedBy", default="UnknownAwsServiceEvent") - elif user_type == "Root": + if user_type == "Root": actor_user = deep_get( event, "userIdentity", @@ -69,6 +67,8 @@ def get_actor_user(event): ) elif user_type in ("AWSService", "AWSAccount"): actor_user = event.get("sourceIdentity", f"Unknown{user_type}") + elif event.get("eventType") == "AwsServiceEvent": + actor_user = deep_get(event, "userIdentity", "invokedBy", default="UnknownAwsServiceEvent") else: actor_user = "UnknownUser" return actor_user diff --git a/data_models/data_models_test.py b/data_models/data_models_test.py new file mode 100644 index 000000000..9b1419ebc --- /dev/null +++ b/data_models/data_models_test.py @@ -0,0 +1,74 @@ +import os +import sys +import unittest + +from panther_analysis_tool.main import load_analysis, setup_data_models +from panther_core.enriched_event import PantherEvent + +# pipenv run does the right thing, but IDE based debuggers may fail to import +# so noting, we append this directory to sys.path +sys.path.append(os.path.dirname(__file__)) +sys.path.append(os.path.dirname(__file__.replace("data_models", "global_helpers"))) + +specs, invalid_specs = load_analysis(os.path.dirname(__file__), [], [], []) +log_type_to_data_model, invalid_data_models = setup_data_models(specs.data_models) + + +class TestAWSCloudTrailDataModel(unittest.TestCase): + data_model = log_type_to_data_model.get("AWS.CloudTrail") + + def test_get_actor_user(self): + base_event = { + "p_log_type": "AWS.CloudTrail", + "userIdentity": { + "type": "user_type", + "principalId": "AIDAJ45Q7YFFAREXAMPLE", + "arn": "arn:aws:iam::123456789012:user/Alice", + "accountId": "Root", + "accessKeyId": "", + "userName": "Root,IAMUser,Directory,Unknown,SAMLUser,WebIdentityUser", + "sessionContext": { + "sessionIssuer": { + "type": "Role", + "principalId": "AROAIDPPEZS35WEXAMPLE", + "arn": "arn:aws:iam::123456789012:role/RoleToBeAssumed", + "accountId": "123456789012", + "userName": "AssumedRole,Role,FederatedUser", + }, + }, + }, + "additionalEventData": {"CredentialType": "PASSWORD", "UserName": "IdentityCenterUser"}, + "sourceIdentity": "AWSService,AWSAccount", + } + + aws_service_event = PantherEvent( + { + "p_log_type": "AWS.CloudTrail", + "eventType": "AwsServiceEvent", + "userIdentity": {"invokedBy": "AwsServiceEvent"}, + }, + self.data_model, + ) + + user_types = ( + "Root", + "IAMUser", + "Directory", + "Unknown", + "SAMLUser", + "WebIdentityUser", + "AssumedRole", + "Role", + "FederatedUser", + "IdentityCenterUser", + "AWSService", + "AWSAccount", + ) + + for user_type in user_types: + event = PantherEvent( + base_event | {"userIdentity": {"type": user_type}}, self.data_model + ) + self.assertTrue(user_type in event.udm("actor_user")) + + self.assertEqual("AwsServiceEvent", aws_service_event.udm("actor_user"))