Skip to content

Commit

Permalink
Add repair_user_data.py
Browse files Browse the repository at this point in the history
  • Loading branch information
jon-ide committed Jul 4, 2024
1 parent 66567c7 commit a0c3a1c
Showing 1 changed file with 231 additions and 0 deletions.
231 changes: 231 additions & 0 deletions webapp/home/repair_user_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
from datetime import datetime
import hashlib
import json
import os
import shutil

from webapp.config import Config
from webapp.views.collaborations.collaborations import change_user_login

REPAIR_LOG_FILE = '__repair_user_data.log'

def log_info(msg):
with open(REPAIR_LOG_FILE, 'a') as f:
f.write(msg + '\n')


def generate_ezeml_user_data_dirs(cname, uid, sub):
"""
This function is used to generate the ezeml user data directories corresponding to the uid and sub values.
:param uid: The user's unique identifier
:param sub: The user's subject value
"""
cname_clean = cname.replace(" ", "_")
uid_hash = hashlib.md5(uid.encode("utf-8")).hexdigest()
uid_dir = os.path.join(Config.USER_DATA_DIR, cname_clean + "-" + uid_hash)
sub_hash = hashlib.md5(sub.encode("utf-8")).hexdigest()
sub_dir = os.path.join(Config.USER_DATA_DIR, cname_clean + "-" + sub_hash)
return uid_dir, sub_dir


def is_repair_needed(cname, idp, uid, sub):
"""
This function is used to determine if the user data needs to be repaired.
:param cname: The user's common name
:param idp: The user's identity provider
:param uid: The user's unique identifier
:param sub: The user's subject value
When a user logs in via some identity provider other than LDAP, we want to see if the user data needs to be repaired.
The logic is as follows:
- If the idp is not 'google', return status is False.
- Else
- Generate the ezeml user data directories corresponding to the uid and sub values.
- If the sub-based dir doesn't exist, return status is False.
- Otherwise, return status is True.
Returns a tuple of three values:
- A boolean indicating if the repair is needed.
- The uid-based directory (or None if it doesn't exist).
- The sub-based directory (or None if it doesn't exist).
"""
current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_info(f"***** {current_datetime} ********************")
log_info(f"is_repair_needed: {cname} {idp} {uid} {sub}")
if idp != 'google':
log_info(' returns False, None, None')
return False, None, None
uid_dir, sub_dir = generate_ezeml_user_data_dirs(cname, uid, sub)
if not os.path.exists(sub_dir):
log_info(f' returns False, {uid_dir}, None')
return False, uid_dir, None
log_info(f' returns True, {uid_dir}, {sub_dir}')
return True, uid_dir, sub_dir


def copy_dir_to_backup(dir):
"""
This function is used to copy a directory to the backup directory.
:param dir: The directory to be copied
"""
backup_dir = os.path.join(Config.USER_DATA_DIR, "__repair_user_data_backups", "uid_dirs")
log_info(f"copy_dir_to_backup: {dir} {backup_dir}")
copy_directory(dir, backup_dir)


def move_dir_to_backup(dir):
"""
This function is used to move a directory to the backup directory.
:param dir: The directory to be moved
"""
backup_dir = os.path.join(Config.USER_DATA_DIR, "__repair_user_data_backups", "sub_dirs")
log_info(f"move_dir_to_backup: {dir} {backup_dir}")
move_directory(dir, backup_dir)


def copy_sub_dir_to_uid_dir(sub_dir, uid_dir):
"""
This function is used to copy the sub directory to the uid directory.
:param sub_dir: The sub directory to be copied
:param uid_dir: The uid directory to which the sub directory is copied
"""
log_info(f"copy_sub_dir_to_uid_dir: {sub_dir} {uid_dir}")
shutil.copytree(sub_dir, uid_dir, dirs_exist_ok=True)


def merge_sub_dir_to_uid_dir(sub_dir, uid_dir):
"""
This function is used to merge the sub directory into the uid directory.
:param sub_dir: The sub directory to be merged
:param uid_dir: The uid directory into which the sub directory is merged
Assumes that the sub directory exists and the uid directory exists.
"""
log_info(f"merge_sub_dir_to_uid_dir: {sub_dir} {uid_dir}")
copy_dir_to_backup(uid_dir)
exclude = ["__user_properties__.json"]
copy_files_with_exclusions(sub_dir, uid_dir, exclude)
merge_user_properties(uid_dir, sub_dir)


def merge_user_properties(uid_dir, sub_dir):
"""
This function is used to merge the __user_properties__.json files in the uid and sub directories into the uid directory.
:param uid_dir: The uid directory
:param sub_dir: The sub directory
Assumes both directories exist, although the __user_properties__.json files may not.
"""
def get_uploads(data1, data2):
uploads_1 = set(map(tuple, data1.get('data_table_upload_filenames', [])))
uploads_2 = set(map(tuple, data2.get('data_table_upload_filenames', [])))
uploads = uploads_1.union(uploads_2)
return sorted(list(map(list, uploads)))

uid_path = os.path.join(Config.USER_DATA_DIR, uid_dir, '__user_properties__.json')
sub_path = os.path.join(Config.USER_DATA_DIR, sub_dir, '__user_properties__.json')
if not os.path.exists(uid_path) and os.path.exists(sub_path):
shutil.copy(sub_path, uid_path)
return
if os.path.exists(uid_path) and not os.path.exists(sub_path):
return
if not os.path.exists(uid_path) and not os.path.exists(sub_path):
return

with open(uid_path, 'r') as uid_file:
uid_data = json.load(uid_file)
with open(sub_path, 'r') as sub_file:
sub_data = json.load(sub_file)

merged = {}
merged['data_table_upload_filenames'] = get_uploads(uid_data, sub_data)
merged['is_first_usage'] = uid_data.get('is_first_usage', False) and sub_data.get('is_first_usage', False)
merged['new_to_badges'] = uid_data.get('new_to_badges', False) and sub_data.get('new_to_badges', False)
merged['model_has_complex_texttypes'] = sub_data.get('model_has_complex_texttypes', False)
merged['enable_complex_text_element_editing_global'] = uid_data.get('enable_complex_text_element_editing_global', False) or sub_data.get('enable_complex_text_element_editing_global', False)
merged['enable_complex_text_element_editing_documents'] = uid_data.get('enable_complex_text_element_editing_documents', []) + sub_data.get('enable_complex_text_element_editing_documents', [])

with open(uid_path, 'w') as uid_file:
json.dump(merged, uid_file, indent=2)


def repair_user_data(cname, idp, uid, sub):
"""
This function is used to repair the user data.
:param cname: The user's common name
:param idp: The user's identity provider
:param uid: The user's unique identifier
:param sub: The user's subject value
When a user logs in via google, we may need to repair the user data.
"""
# Determine if the user data needs to be repaired
repair_needed, uid_dir, sub_dir = is_repair_needed(cname, idp, uid, sub)
if repair_needed:
if not os.path.exists(uid_dir):
# We never created a directory based on the user's uid (email address), so we copy the sub-based directory
# to the uid-based directory.
copy_sub_dir_to_uid_dir(sub_dir, uid_dir)
else:
# We have both the uid-based and sub-based directories, so we merge the sub-based directory into the
# uid-based directory.
merge_sub_dir_to_uid_dir(sub_dir, uid_dir)
# Fixup the collaborations database to use the uid-based login instead of the sub-based login
log_info(f"change_user_login: {os.path.basename(sub_dir)} {os.path.basename(uid_dir)}")
change_user_login(os.path.basename(sub_dir), os.path.basename(uid_dir))
# Save the sub-based directory to the backup directory
move_dir_to_backup(sub_dir)


def copy_directory(src, dst):
# Check if the source directory exists
if not os.path.exists(src):
raise FileNotFoundError(f"Source directory {src} does not exist.")

# Check if the destination directory exists, create it if it doesn't
dst_path = os.path.join(dst, os.path.basename(src))
if not os.path.exists(dst):
os.makedirs(dst)

# Copy the directory tree from src to dst
shutil.copytree(src, dst_path, dirs_exist_ok=True)


def move_directory(src, dst):
# Check if the source directory exists
if not os.path.exists(src):
raise FileNotFoundError(f"Source directory {src} does not exist.")

if os.path.exists(dst):
shutil.rmtree(dst)

if not os.path.isdir(dst):
os.mkdir(dst)

# Move the directory
shutil.move(src, dst)


def copy_files_with_exclusions(src, dst, exclude_files):
# Check if the source directory exists
if not os.path.exists(src):
raise FileNotFoundError(f"Source directory {src} does not exist.")

# Ensure the destination directory exists
if not os.path.exists(dst):
os.makedirs(dst)

for root, dirs, files in os.walk(src):
# Create corresponding directories in the destination
for dir in dirs:
dest_dir = os.path.join(dst, os.path.relpath(os.path.join(root, dir), src))
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)

# Copy files excluding those in the exclude_files list
for file in files:
if file not in exclude_files:
src_file = os.path.join(root, file)
dest_file = os.path.join(dst, os.path.relpath(src_file, src))
shutil.copy2(src_file, dest_file)

0 comments on commit a0c3a1c

Please sign in to comment.