Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Cherry-Pick Merge: Add validation for duplicated access requests (#252) #278

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions e2e/slack/test_accessbot_slack_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ def test_access_command_grant_with_strange_casing(self, mocked_testbot):
assert "access request" in mocked_testbot.pop_message()
assert "Granting" in mocked_testbot.pop_message()

def test_access_command_fails_when_duplicating_request(self, mocked_testbot):
mocked_testbot.push_message(f"access to {resource_name}")
mocked_testbot.push_message(f"access to {resource_name}")
assert "valid request" in mocked_testbot.pop_message()
assert "access request" in mocked_testbot.pop_message()
assert "already have a pending grant request" in mocked_testbot.pop_message()


class Test_access_flow_from_access_form(ErrBotExtraTestSettings):
@pytest.fixture
def mocked_testbot(self, testbot):
Expand Down
6 changes: 6 additions & 0 deletions plugins/sdm/accessbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def access_resource(self, message, match):
try:
self.get_arguments_helper().check_required_flags(flags_validators.keys(), self.config['REQUIRED_FLAGS'], flags)
self.check_requester_flag(message, flags.get('requester'))
self.__grant_requests_helper.check_request_already_exists(resource_name, GrantRequestType.ACCESS_RESOURCE, message.frm.person)
except Exception as e:
yield str(e)
return
Expand All @@ -198,6 +199,11 @@ def assign_role(self, message, match):
return
self.__metrics_helper.increment_access_requests()
role_name = re.sub(ASSIGN_ROLE_REGEX, "\\1", match.string.replace("*", ""), flags=re.IGNORECASE)
try:
self.__grant_requests_helper.check_request_already_exists(role_name, GrantRequestType.ASSIGN_ROLE, message.frm.person)
except Exception as e:
yield str(e)
return
yield from self.get_role_grant_helper().request_access(message, role_name)
self.__metrics_helper.reset_consecutive_errors()

Expand Down
8 changes: 8 additions & 0 deletions plugins/sdm/lib/helper/grant_request_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ def remove(self, request_id: str):
self.__grant_requests.pop(request_id, None)
self.save_state()

def check_request_already_exists(self, sdm_object_name: str, grant_request_type: GrantRequestType, user: str):
for grant_request in self.__grant_requests.values():
if grant_request["type"] == grant_request_type.value and grant_request["message"].frm.person == user \
and grant_request["sdm_object"].name.lower() == sdm_object_name.lower():
obj_type_name = "resource" if grant_request_type == GrantRequestType.ACCESS_RESOURCE else "role"
raise Exception(
f"You already have a pending grant request for that {obj_type_name}. Please, wait for an admin evaluation.")

def __sdm_model_to_dict(self, object):
return object if type(object) is dict else object.to_dict()

Expand Down
19 changes: 19 additions & 0 deletions plugins/sdm/lib/helper/test_grant_request_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,25 @@ def test_restore_state_when_has_stored_requests(self):
assert helper.get(request_id) is None
assert not helper.exists(request_id)

def test_raise_exception_when_add_duplicated_request(self):
with patch("os.path.isfile") as mock_isfile:
mock_isfile.side_effect = [True]
bot = get_mocked_bot()
with patch("builtins.open", mock_open(read_data=mocked_file_data)) as handle:
helper = GrantRequestHelper(bot)
assert helper.get(request_id) is not None
assert helper.exists(request_id)
assert len(helper.get_request_ids()) == 1
file = handle()
file.read.assert_called_once()
with pytest.raises(Exception) as e:
helper.add(request_id, get_mocked_message(), get_mock_sdm_object(), get_mock_sdm_account(),
GrantRequestType.ACCESS_RESOURCE)
assert "already have a pending grant request" in e.value
helper.remove(request_id)
assert helper.get(request_id) is None
assert not helper.exists(request_id)

def test_dont_restore_state_when_has_stored_requests_and_is_disabled(self):
with patch("os.path.isfile") as mock_isfile:
mock_isfile.side_effect = [True]
Expand Down