diff --git a/app/api/auth.py b/app/api/auth.py index 2f6406976d..794cea20e3 100644 --- a/app/api/auth.py +++ b/app/api/auth.py @@ -308,6 +308,12 @@ def verify_email(): logging.info('Email Verified') return make_response(jsonify(message="Email Verified"), 200) +# implemented regular expression to limit the character set to be used in the emails. +# used bleach library to prevent cross site scripting attacks +import re +from bleach import clean + +EMAIL_REGEX = r"^[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" @auth_routes.route('/resend-verification-email', methods=['POST']) def resend_verification_email(): @@ -317,6 +323,13 @@ def resend_verification_email(): logging.error('Bad Request') raise BadRequestError({'source': ''}, 'Bad Request Error') + if not re.match(EMAIL_REGEX, email): # this portion matches the email with the fixed character set. + logging.error('Invalid email address') + raise BadRequestError({'source': ''}, 'Invalid email address') + + sanitized_email = clean(email) # here the mail is cleaned from XSS attacks + email=sanitized_email + try: user = User.query.filter_by(email=email).one() except NoResultFound: diff --git a/app/api/chat/rocket_chat.py b/app/api/chat/rocket_chat.py index fdb3ea5885..6de4d63737 100644 --- a/app/api/chat/rocket_chat.py +++ b/app/api/chat/rocket_chat.py @@ -1,9 +1,9 @@ import logging -import random import string +import secrets from dataclasses import dataclass from typing import Optional - + import requests from app.api.helpers.db import get_new_identifier, get_or_create @@ -14,6 +14,8 @@ from app.models.video_stream import VideoStream from app.settings import get_settings +from typing import Union + logger = logging.getLogger(__name__) @@ -39,8 +41,8 @@ class RocketChat: def login_url(self) -> str: return self.api_url + '/api/v1/login' - def login(self, user: User, event: Optional[Event] = None, method: str = 'login'): - def save_token(token): + def login(self, user: User, event: Optional[Event] = None, method: str = 'login') -> Union[dict, None]: + def save_token(token)->None: user.rocket_chat_token = token db.session.add(user) db.session.commit() @@ -66,7 +68,7 @@ def register( user: User, event: Optional[Event] = None, username_suffix='', - ): + )->Union[dict,None]: settings = get_settings() register_url = self.api_url + '/api/v1/users.register' register_data = { @@ -104,7 +106,7 @@ def get_token( event: Optional[Event] = None, retried=False, microlocation: Optional[Microlocation] = None, - ): + )->Union[dict,None]: if user.rocket_chat_token: res = requests.post(self.login_url, json=dict(resume=user.rocket_chat_token)) @@ -149,7 +151,7 @@ def get_token_virtual_room( event: Optional[Event] = None, retried=False, videoStream: Optional[VideoStream] = None, - ): + )->Union[dict,None]: if user.rocket_chat_token: res = requests.post(self.login_url, json=dict(resume=user.rocket_chat_token)) @@ -190,7 +192,7 @@ def get_token_virtual_room( return self.register(user, event) - def check_or_create_bot(self): + def check_or_create_bot(self)->User: bot_email = 'open-event-bot@open-event.invalid' bot_user, _ = get_or_create( User, @@ -200,7 +202,7 @@ def check_or_create_bot(self): return bot_user - def create_room(self, event: Event, microlocation: Optional[Microlocation], data): + def create_room(self, event: Event, microlocation: Optional[Microlocation], data)->None: bot_token = data['token'] bot_id = data['res']['data']['userId'] if microlocation: @@ -234,7 +236,7 @@ def create_room(self, event: Event, microlocation: Optional[Microlocation], data def create_room_virtual_room( self, event: Event, videoStream: Optional[VideoStream], data - ): + )->None: bot_token = data['token'] bot_id = data['res']['data']['userId'] if videoStream: @@ -268,7 +270,7 @@ def create_room_virtual_room( def add_in_room( self, event: Event, rocket_user_id, microlocation: Optional[Microlocation] = None - ): + )->None: bot = self.check_or_create_bot() data = self.get_token(bot) @@ -299,7 +301,7 @@ def add_in_room( def add_in_room_virtual_room( self, event: Event, rocket_user_id, videoStream: Optional[VideoStream] = None - ): + )->None: bot = self.check_or_create_bot() data = self.get_token_virtual_room(bot, videoStream=videoStream) if (not event.chat_room_id) or (videoStream and not videoStream.chat_room_id): @@ -328,15 +330,21 @@ def add_in_room_virtual_room( raise RocketChatException('Error while adding user', response=res) -def generate_pass(size=10, chars=string.ascii_lowercase + string.digits): - return ''.join(random.choice(chars) for _ in range(size)) +def generate_pass(size=10, chars=string.digits + string.ascii_letters + string.punctuation)->str: + # Error handled cases for negative size and no password characters. + # Secrets used instead of random, due to the code being used by company. + if size < 0: + raise ValueError("Negative length for password not allowed") + elif not chars: + raise ValueError("Password cannot be empty") + return ''.join(secrets.choice(chars) for _ in range(size)) def get_rocket_chat_token( user: User, event: Optional[Event] = None, microlocation: Optional[Microlocation] = None, -): +)->dict: settings = get_settings() if not (api_url := settings['rocket_chat_url']): raise RocketChatException( @@ -351,7 +359,7 @@ def get_rocket_chat_token_virtual_room( user: User, event: Optional[Event] = None, videoStream: Optional[VideoStream] = None, -): +)->Union[dict,None]: settings = get_settings() if not (api_url := settings['rocket_chat_url']): raise RocketChatException( @@ -362,7 +370,7 @@ def get_rocket_chat_token_virtual_room( return rocket_chat.get_token_virtual_room(user, event, videoStream=videoStream) -def rename_rocketchat_room(event: Event): +def rename_rocketchat_room(event: Event)-> None: settings = get_settings() if not event.chat_room_id or not (api_url := settings['rocket_chat_url']): return