Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Migration de l'authentification de token vers un middleware + Sauvegarde des anciennes clés d'apis #1531

Merged
merged 13 commits into from
Nov 27, 2024
3 changes: 3 additions & 0 deletions config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,9 @@
"DEFAULT_FILTER_BACKENDS": ["django_filters.rest_framework.DjangoFilterBackend"],
"DEFAULT_PAGINATION_CLASS": "rest_framework.pagination.LimitOffsetPagination",
"PAGE_SIZE": 100,
"DEFAULT_AUTHENTICATION_CLASSES": [
"lemarche.api.authentication.CustomBearerAuthentication",
],
}


Expand Down
89 changes: 89 additions & 0 deletions lemarche/api/authentication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import logging

from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed

from lemarche.users.models import User


logger = logging.getLogger(__name__)


class CustomBearerAuthentication(BaseAuthentication):
"""
Authentication via:
1. Authorization header: Bearer <token> (recommended).
2. URL parameter ?token=<token> (deprecated, temporary support).
"""

def authenticate(self, request):
token = None
warning_issued = False

# Priority to the Authorization header
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split("Bearer ")[1]
elif request.GET.get("token"): # Otherwise, try the URL parameter
token = request.GET.get("token")
warning_issued = True
logger.warning("Authentication via URL token detected. This method is deprecated and less secure.")

# If no token is provided
if not token:
return None

# Check the minimum length of the token
if len(token) < 20:
raise AuthenticationFailed("Token too short. Possible security issue detected.")

# Validate the token
try:
user = User.objects.has_api_key().get(api_key=token)
except User.DoesNotExist:
raise AuthenticationFailed("Invalid or expired token")

# Add a warning in the response for URL tokens
if warning_issued:
request._deprecated_auth_warning = True # Marker for middleware or view

# Return the user and the token
return (user, token)

def authenticate_header(self, request):
"""
Returns the expected header for 401 responses.
"""
return 'Bearer realm="api"'


class DeprecationWarningMiddleware:
"""
Middleware to inform users that authentication via URL `?token=` is deprecated.

This middleware checks if the request contains a deprecated authentication token
and adds a warning header to the response if it does.

Attributes:
get_response (callable): The next middleware or view in the chain.

Methods:
__call__(request):
Processes the request and adds a deprecation warning header to the response
if the request contains a deprecated authentication token.
"""

def __init__(self, get_response):
self.get_response = get_response

def __call__(self, request):
response = self.get_response(request)

# Ajoute un warning si le marqueur est défini dans la requête
if hasattr(request, "_deprecated_auth_warning") and request._deprecated_auth_warning:
response.headers["Deprecation-Warning"] = (
"URL token authentication is deprecated and will be removed on 2025/01. "
"Please use Authorization header with Bearer tokens."
)

return response
122 changes: 69 additions & 53 deletions lemarche/api/siaes/tests.py

Large diffs are not rendered by default.

51 changes: 28 additions & 23 deletions lemarche/api/siaes/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from lemarche.api.siaes.filters import SiaeFilter
from lemarche.api.siaes.serializers import SiaeDetailSerializer, SiaeListSerializer
from lemarche.api.utils import BasicChoiceSerializer, BasicChoiceWithParentSerializer, check_user_token
from lemarche.api.utils import BasicChoiceSerializer, BasicChoiceWithParentSerializer
from lemarche.siaes import constants as siae_constants
from lemarche.siaes.models import Siae

Expand All @@ -24,7 +24,9 @@ class SiaeViewSet(mixins.ListModelMixin, mixins.RetrieveModelMixin, viewsets.Gen
summary="Lister toutes les structures",
tags=[Siae._meta.verbose_name_plural],
parameters=[
OpenApiParameter(name="token", description="Token Utilisateur", required=False, type=str),
OpenApiParameter(
name="token", description="Token Utilisateur (pour compatibilité ancienne)", required=False, type=str
),
],
)
def list(self, request, format=None):
Expand All @@ -33,23 +35,24 @@ def list(self, request, format=None):

<i>Un <strong>token</strong> est nécessaire pour l'accès complet à cette ressource.</i>
"""
if request.method == "GET":
token = request.GET.get("token", None)
if not token:
serializer = SiaeListSerializer(
self.get_queryset()[:10],
many=True,
)
return Response(serializer.data)
else:
check_user_token(token)
return super().list(request, format)
if request.user.is_authenticated:
# Utilisateur authentifié : accès complet
return super().list(request, format)
else:
# Utilisateur non authentifié : limiter à 10 résultats
serializer = SiaeListSerializer(
self.get_queryset()[:10],
many=True,
)
return Response(serializer.data)

@extend_schema(
summary="Détail d'une structure (par son id)",
tags=[Siae._meta.verbose_name_plural],
parameters=[
OpenApiParameter(name="token", description="Token Utilisateur", required=False, type=str),
OpenApiParameter(
name="token", description="Token Utilisateur (pour compatibilité ancienne)", required=False, type=str
),
],
responses=SiaeDetailSerializer,
)
Expand All @@ -65,7 +68,9 @@ def retrieve(self, request, pk=None, format=None):
summary="Détail d'une structure (par son slug)",
tags=[Siae._meta.verbose_name_plural],
parameters=[
OpenApiParameter(name="token", description="Token Utilisateur", required=False, type=str),
OpenApiParameter(
name="token", description="Token Utilisateur (pour compatibilité ancienne)", required=False, type=str
),
],
responses=SiaeDetailSerializer,
)
Expand All @@ -82,7 +87,9 @@ def retrieve_by_slug(self, request, slug=None, format=None):
summary="Détail d'une structure (par son siren)",
tags=[Siae._meta.verbose_name_plural],
parameters=[
OpenApiParameter(name="token", description="Token Utilisateur", required=False, type=str),
OpenApiParameter(
name="token", description="Token Utilisateur (pour compatibilité ancienne)", required=False, type=str
),
],
responses=SiaeDetailSerializer,
)
Expand All @@ -100,7 +107,9 @@ def retrieve_by_siren(self, request, siren=None, format=None):
summary="Détail d'une structure (par son siret)",
tags=[Siae._meta.verbose_name_plural],
parameters=[
OpenApiParameter(name="token", description="Token Utilisateur", required=False, type=str),
OpenApiParameter(
name="token", description="Token Utilisateur (pour compatibilité ancienne)", required=False, type=str
),
],
responses=SiaeDetailSerializer,
)
Expand All @@ -115,29 +124,25 @@ def retrieve_by_siret(self, request, siret=None, format=None):
return self._list_return(request, queryset, format)

def _retrieve_return(self, request, queryset, format):
token = request.GET.get("token", None)
if not token:
if not request.user.is_authenticated:
serializer = SiaeListSerializer(
queryset,
many=False,
)
else:
check_user_token(token)
serializer = SiaeDetailSerializer(
queryset,
many=False,
)
return Response(serializer.data)

def _list_return(self, request, queryset, format):
token = request.GET.get("token", None)
if not token:
if not request.user.is_authenticated:
serializer = SiaeListSerializer(
queryset,
many=True,
)
else:
check_user_token(token)
serializer = SiaeDetailSerializer(
queryset,
many=True,
Expand Down
11 changes: 7 additions & 4 deletions lemarche/api/tenders/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from django.test import TestCase
from django.urls import reverse

from lemarche.api.utils import generate_random_string
from lemarche.perimeters.factories import PerimeterFactory
from lemarche.sectors.factories import SectorFactory
from lemarche.tenders import constants as tender_constants
Expand Down Expand Up @@ -45,10 +46,11 @@
class TenderCreateApiTest(TestCase):
@classmethod
def setUpTestData(cls):
cls.url = reverse("api:tenders-list") + "?token=admin"
cls.user_token = generate_random_string()
cls.url = reverse("api:tenders-list") + "?token=" + cls.user_token
cls.user = UserFactory()
cls.user_buyer = UserFactory(kind=User.KIND_BUYER, company_name="Entreprise Buyer")
cls.user_with_token = UserFactory(email="[email protected]", api_key="admin")
cls.user_with_token = UserFactory(email="[email protected]", api_key=cls.user_token)
cls.perimeter = PerimeterFactory()
cls.sector_1 = SectorFactory()
cls.sector_2 = SectorFactory()
Expand Down Expand Up @@ -256,8 +258,9 @@ def test_create_tender_with_distance_location(self):
class TenderCreateApiPartnerTest(TestCase):
@classmethod
def setUpTestData(cls):
cls.url = reverse("api:tenders-list") + "?token=approch"
cls.user_partner_with_token = UserFactory(email="[email protected]", api_key="approch")
cls.api_token_approch = generate_random_string()
cls.url = reverse("api:tenders-list") + "?token=" + cls.api_token_approch
cls.user_partner_with_token = UserFactory(email="[email protected]", api_key=cls.api_token_approch)

def test_partner_approch_can_create_tender(self):
with self.settings(PARTNER_APPROCH_USER_ID=self.user_partner_with_token.id):
Expand Down
6 changes: 3 additions & 3 deletions lemarche/api/tenders/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from django.utils import timezone
from drf_spectacular.utils import OpenApiParameter, extend_schema
from rest_framework import mixins, viewsets
from rest_framework.permissions import IsAuthenticated

from lemarche.api.tenders.serializers import TenderSerializer
from lemarche.api.utils import BasicChoiceSerializer, check_user_token
from lemarche.api.utils import BasicChoiceSerializer
from lemarche.tenders import constants as tender_constants
from lemarche.tenders.models import Tender
from lemarche.users import constants as user_constants
Expand All @@ -16,6 +17,7 @@


class TenderViewSet(mixins.CreateModelMixin, viewsets.GenericViewSet):
permission_classes = [IsAuthenticated]
serializer_class = TenderSerializer

@extend_schema(
Expand All @@ -26,8 +28,6 @@ class TenderViewSet(mixins.CreateModelMixin, viewsets.GenericViewSet):
],
)
def create(self, request, *args, **kwargs):
token = request.GET.get("token", None)
check_user_token(token)
return super().create(request, args, kwargs)

def perform_create(self, serializer: TenderSerializer):
Expand Down
Loading
Loading