Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(access): performance fixes #473

Merged
merged 16 commits into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,25 @@ within the `deploy/` directory there is a docker compose file. running `docker c

You may need to run migrations if your not mounting your own DB. to do this run `docker exec -ti centurion-erp python manage.py migrate`

## Page speed tests

to run page speed tests (requires a working prometheus and grafa setup). use the following


``` bash

clear; \
K6_PROMETHEUS_RW_TREND_STATS="p(99),p(95),p(90),max,min" \
K6_PROMETHEUS_RW_SERVER_URL=http://<prometheus url>:9090/api/v1/write \
BASE_URL="http://127.0.0.1:8002" \
AUTH_TOKEN="< api token of superuser>" \
k6 run \
-o experimental-prometheus-rw \
--tag "commit=$(git rev-parse HEAD)" \
--tag "testid=<name of test for ref>" \
test/page_speed.js

```


# Old working docs
Expand Down
154 changes: 154 additions & 0 deletions app/access/middleware/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from django.contrib.auth.middleware import (
AuthenticationMiddleware,
SimpleLazyObject,
partial,
)
from django.contrib.auth.models import User, Group

from access.models import Organization, Team

from settings.models.app_settings import AppSettings


class AuthenticationMiddleware(AuthenticationMiddleware):


def process_request(self, request):

super().process_request(request)

request.app_settings = AppSettings.objects.select_related('global_organization').get(
owner_organization = None
)

request.tenancy = Tenancy(request.user, request.app_settings)



class Tenancy:

user: User = None

groups: list([Group]) = None

_app_settings: AppSettings = None


_user_organizations: list([Organization]) = None
"""Cached User Organizations"""

_user_teams: list([Team]) = None
"""Cached User Teams"""


_user_permissions: list([str]) = None
"""Cached User User Permissions"""



def __init__(self, user: User, app_settings: AppSettings):

self.user = user

self. _app_settings = app_settings

self.groups = user.groups.select_related('team', 'team__organization').prefetch_related('team__permissions__content_type')

self._user_organizations = []

self._user_groups = []

self._user_teams = []

self._user_permissions = []


for group in self.groups:

if group.team not in self._user_teams:

self._user_teams += [ group.team ]

for permission in group.team.permissions.all():

permission_value = str( permission.content_type.app_label + '.' + permission.codename )

if permission_value not in self._user_permissions:

self._user_permissions += [ permission_value ]


if group.team.organization not in self._user_organizations:

self._user_organizations += [ group.team.organization ]



def is_member(self, organization: Organization) -> bool:
"""Returns true if the current user is a member of the organization

iterates over the user_organizations list and returns true if the user is a member

Returns:
bool: _description_
"""

is_member: bool = False

if organization is None:

return False

if int(organization) in self._user_organizations:

is_member = True

return is_member



def has_organization_permission(self, organization: Organization, permissions_required: str) -> bool:
""" Check if user has permission within organization.

Args:
organization (int): Organization to check.
permissions_required (list): if doing object level permissions, pass in required permission.

Returns:
bool: True for yes.
"""

has_permission: bool = False

if type(organization) is not Organization:

raise TypeError('Organization must be of type Organization')


if type(permissions_required) is not str:

raise TypeError('permissions_required must be of type str')


if not organization:

return has_permission


for team in self._user_teams:

if(
team.organization.id == int(organization)
or getattr(self._app_settings.global_organization, 'id', 0) == int(organization)
):

for permission in team.permissions.all():

assembled_permission = str(permission.content_type.app_label) + '.' + str( permission.codename )

if assembled_permission == permissions_required:

has_permission = True


return has_permission
152 changes: 1 addition & 151 deletions app/access/mixins/organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def get_permission_organizations(self, permission: str ) -> list([ int ]):

_permission_organizations: list = []

for team in self.get_user_teams( self.request.user ):
for team in self.request.tenancy._user_teams:

for team_permission in team.permissions.all():

Expand Down Expand Up @@ -258,153 +258,3 @@ def get_permission_required(self) -> str:

This value is used to define the kwarg that is used as the parent objects primary key (pk).
"""


_user_organizations: list = []
"""Cached User Organizations"""


_user_teams: list = []
"""Cached User Teams"""


_user_permissions: list = []
"""Cached User User Permissions"""


def get_user_organizations(self, user: User) -> list([int]):
"""Get the Organization the user is a part of

Args:
user (User): User Making the request

Returns:
list(int()): List containing the organizations the user is a part of.
"""

if self._user_organizations and self._user_teams and self._user_permissions:

return self._user_organizations


teams = Team.objects.all()

_user_organizations: list([ int ]) = []

_user_teams: list([ Team ]) = []

_user_permissions: list([ str ]) = []

for group in user.groups.all():

team = teams.get(pk=group.id)

if team not in _user_teams:

_user_teams += [ team ]

for permission in team.permissions.all():

permission_value = str( permission.content_type.app_label + '.' + permission.codename )

if permission_value not in _user_permissions:

_user_permissions += [ permission_value ]


if team.organization.id not in _user_organizations:

_user_organizations += [ team.organization.id ]


if len(_user_organizations) > 0:

self._user_organizations = _user_organizations

if len(_user_teams) > 0:

self._user_teams = _user_teams

if len(_user_permissions) > 0:

self._user_permissions = _user_permissions


return self._user_organizations



def get_user_teams(self, user: User) -> list([ Team ]):

if not self._user_teams:

self.get_user_organizations( user = user )

return self._user_teams



def has_organization_permission(self, organization: int, permissions_required: list) -> bool:
""" Check if user has permission within organization.

Args:
organization (int): Organization to check.
permissions_required (list): if doing object level permissions, pass in required permission.

Returns:
bool: True for yes.
"""

has_permission: bool = False

if not organization:

return has_permission

from settings.models.app_settings import AppSettings


app_settings = AppSettings.objects.get(
owner_organization = None
)

for team in self.get_user_teams( user = self.request.user ):

if(
team.organization.id == int(organization)
or getattr(app_settings.global_organization, 'id', 0) == int(organization)
):

for permission in team.permissions.all():

assembled_permission = str(permission.content_type.app_label) + '.' + str( permission.codename )

if assembled_permission in permissions_required:

has_permission = True


return has_permission



def is_member(self, organization: int) -> bool:
"""Returns true if the current user is a member of the organization

iterates over the user_organizations list and returns true if the user is a member

Returns:
bool: _description_
"""

is_member: bool = False

if organization is None:

return False

if int(organization) in self.get_user_organizations(self.request.user):

is_member = True

return is_member
Loading
Loading