From 2a70d49f578496073360f2bb2487c77394e36084 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Tue, 9 Oct 2018 10:08:05 -0400 Subject: [PATCH 01/12] Draft - script to detect which attachment files are orphans --- .../management/commands/remove_s3_orphans.py | 113 ++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 onadata/apps/logger/management/commands/remove_s3_orphans.py diff --git a/onadata/apps/logger/management/commands/remove_s3_orphans.py b/onadata/apps/logger/management/commands/remove_s3_orphans.py new file mode 100644 index 000000000..92a391f95 --- /dev/null +++ b/onadata/apps/logger/management/commands/remove_s3_orphans.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python +# vim: ai ts=4 sts=4 et sw=4 coding=utf-8 +# -*- coding: utf-8 -*- +#from __future__ import unicode_literals +import codecs +import re +import sys +import time + +from django.core.management.base import BaseCommand, CommandError +from django.core.files.storage import get_storage_class +from django.utils.translation import ugettext as _, ugettext_lazy + +from onadata.apps.logger.models import Attachment + +# S3 Monkey Patch +import boto +from boto import handler +from boto.resultset import ResultSet +from boto.s3.bucket import Bucket +import xml.sax +import xml.sax.saxutils + + +def _get_all(self, element_map, initial_query_string='', + headers=None, **params): + query_args = self._get_all_query_args( + params, + initial_query_string=initial_query_string + ) + response = self.connection.make_request('GET', self.name, + headers=headers, + query_args=query_args) + body = response.read() + + if response.status == 200: + rs = ResultSet(element_map) + h = handler.XmlHandler(rs, self) + try: + xml.sax.parseString(fix_bad_characters(body), h) + except Exception as e: + print("XML Parsing Error - {}".format(str(e))) + error_filename = "/srv/logs/s3_body_error-{}.xml".format(str(int(time.time()))) + with open(error_filename, "w") as xmlfile_error: + xmlfile_error.write("{}\n".format(str(e))) + xmlfile_error.write(body) + raise Exception(str(e)) + return rs + else: + raise self.connection.provider.storage_response_error( + response.status, response.reason, body) + +def fix_bad_characters(str_): + + str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", str_) + return str_ + + +class Command(BaseCommand): + help = ugettext_lazy("Removes attachments orphans from S3") + + def handle(self, *args, **kwargs): + + Bucket._get_all = _get_all + + s3 = get_storage_class('storages.backends.s3boto.S3BotoStorage')() + all_files = s3.bucket.list() + size_to_reclaim = 0 + orphans = 0 + + now = time.time() + csv_filepath = "/srv/logs/ocha_s3_orphans.csv" + + with open(csv_filepath, "w") as csv: + csv.write("filename,filesize\n") + + for f in all_files: + try: + filename = f.name + if filename[-1] != "/" and re.match(r"[^\/]*\/attachments\/[^\/]*\/[^\/]*\/.+", filename): + filesize = f.size + + clean_filename = filename + for auto_suffix in ["-large", "-medium", "-small"]: + if filename[-(len(auto_suffix) + 4):-4] == auto_suffix: + clean_filename = filename[:-(len(auto_suffix) + 4)] + filename[-4:] + break + + if not Attachment.objects.filter(media_file=clean_filename).exists(): + orphans += 1 + size_to_reclaim += filesize + csv = codecs.open(csv_filepath, "a", "utf-8") + csv.write("{},{}\n".format(filename, filesize)) + csv.close() + print("File does not exist:") + print("\t{}".format(filename)) + + if time.time() - now >= 5 * 60: + print("[{}] Still alive...".format(str(int(time.time())))) + now = time.time() + + except Exception as e: + print("ERROR - {}".format(str(e))) + + print("Orphans: {}".format(orphans)) + print("Size: {}".format(self.sizeof_fmt(size_to_reclaim))) + + def sizeof_fmt(self, num, suffix='B'): + for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: + if abs(num) < 1024.0: + return "%3.1f%s%s" % (num, unit, suffix) + num /= 1024.0 + return "%.1f%s%s" % (num, 'Yi', suffix) \ No newline at end of file From 4bc93976a06d8482662a4f15205aa83db6c384be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Thu, 11 Oct 2018 18:06:48 -0400 Subject: [PATCH 02/12] Checks exports too --- .../management/commands/remove_s3_orphans.py | 84 ++++++++++++++----- 1 file changed, 64 insertions(+), 20 deletions(-) diff --git a/onadata/apps/logger/management/commands/remove_s3_orphans.py b/onadata/apps/logger/management/commands/remove_s3_orphans.py index 92a391f95..648b6a1e0 100644 --- a/onadata/apps/logger/management/commands/remove_s3_orphans.py +++ b/onadata/apps/logger/management/commands/remove_s3_orphans.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # vim: ai ts=4 sts=4 et sw=4 coding=utf-8 # -*- coding: utf-8 -*- -#from __future__ import unicode_literals +from __future__ import unicode_literals import codecs import re import sys @@ -9,9 +9,13 @@ from django.core.management.base import BaseCommand, CommandError from django.core.files.storage import get_storage_class +from django.db import connection +from django.db.models import Value as V +from django.db.models.functions import Concat from django.utils.translation import ugettext as _, ugettext_lazy from onadata.apps.logger.models import Attachment +from onadata.apps.viewer.models import Export # S3 Monkey Patch import boto @@ -50,9 +54,15 @@ def _get_all(self, element_map, initial_query_string='', raise self.connection.provider.storage_response_error( response.status, response.reason, body) + def fix_bad_characters(str_): - str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", str_) + try: + str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", str_) + except Exception as e: + # Try to force unicode + str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", unicode(str_, "utf-8")) + str_ = str_.encode("utf-8") return str_ @@ -72,28 +82,60 @@ def handle(self, *args, **kwargs): csv_filepath = "/srv/logs/ocha_s3_orphans.csv" with open(csv_filepath, "w") as csv: - csv.write("filename,filesize\n") + csv.write("type,filename,filesize\n") for f in all_files: try: filename = f.name - if filename[-1] != "/" and re.match(r"[^\/]*\/attachments\/[^\/]*\/[^\/]*\/.+", filename): - filesize = f.size - - clean_filename = filename - for auto_suffix in ["-large", "-medium", "-small"]: - if filename[-(len(auto_suffix) + 4):-4] == auto_suffix: - clean_filename = filename[:-(len(auto_suffix) + 4)] + filename[-4:] - break - - if not Attachment.objects.filter(media_file=clean_filename).exists(): - orphans += 1 - size_to_reclaim += filesize - csv = codecs.open(csv_filepath, "a", "utf-8") - csv.write("{},{}\n".format(filename, filesize)) - csv.close() - print("File does not exist:") - print("\t{}".format(filename)) + if filename[-1] != "/": + if re.match(r"[^\/]*\/attachments\/[^\/]*\/[^\/]*\/.+", filename): + clean_filename = filename + for auto_suffix in ["-large", "-medium", "-small"]: + if filename[-(len(auto_suffix) + 4):-4] == auto_suffix: + clean_filename = filename[:-(len(auto_suffix) + 4)] + filename[-4:] + break + + if not Attachment.objects.filter(media_file=clean_filename).exists(): + filesize = f.size + orphans += 1 + size_to_reclaim += filesize + csv = codecs.open(csv_filepath, "a", "utf-8") + csv.write("{},{},{}\n".format("attachment", filename, filesize)) + csv.close() + print("File {} does not exist".format(filename)) + + elif re.match(r"[^\/]*\/exports\/[^\/]*\/[^\/]*\/.+", filename): + #KC Export + if not Export.objects.annotate(fullpath=Concat("filedir", + V("/"), "filename"))\ + .filter(fullpath=filename).exists(): + filesize = f.size + orphans += 1 + size_to_reclaim += filesize + csv = codecs.open(csv_filepath, "a", "utf-8") + csv.write("{},{},{}\n".format("attachment", filename, filesize)) + csv.close() + print("File {} does not exist".format(filename)) + + elif re.match(r"[^\/]*\/exports\/.+", filename): + #KPI Export + does_exist = False + with connection.cursor() as cursor: + cursor.execute("SELECT EXISTS(SELECT id FROM kpi_exporttask WHERE result = %s)", [filename]) + try: + row = cursor.fetchone() + does_exist = row[0] + except: + pass + + if not does_exist: + filesize = f.size + orphans += 1 + size_to_reclaim += filesize + csv = codecs.open(csv_filepath, "a", "utf-8") + csv.write("{},{},{}\n".format("attachment", filename, filesize)) + csv.close() + print("File {} does not exist".format(filename)) if time.time() - now >= 5 * 60: print("[{}] Still alive...".format(str(int(time.time())))) @@ -101,6 +143,8 @@ def handle(self, *args, **kwargs): except Exception as e: print("ERROR - {}".format(str(e))) + sys.exit() + print("Orphans: {}".format(orphans)) print("Size: {}".format(self.sizeof_fmt(size_to_reclaim))) From a00beaeb70d1eb855a0da8b9e3eb2a1f610dc46c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Wed, 17 Apr 2019 09:15:55 -0400 Subject: [PATCH 03/12] Delete file when it's not detected in DB --- .../management/commands/remove_s3_orphans.py | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/onadata/apps/logger/management/commands/remove_s3_orphans.py b/onadata/apps/logger/management/commands/remove_s3_orphans.py index 648b6a1e0..6f7efdbce 100644 --- a/onadata/apps/logger/management/commands/remove_s3_orphans.py +++ b/onadata/apps/logger/management/commands/remove_s3_orphans.py @@ -79,7 +79,7 @@ def handle(self, *args, **kwargs): orphans = 0 now = time.time() - csv_filepath = "/srv/logs/ocha_s3_orphans.csv" + csv_filepath = "/srv/logs/orphans_files.csv" with open(csv_filepath, "w") as csv: csv.write("type,filename,filesize\n") @@ -102,7 +102,7 @@ def handle(self, *args, **kwargs): csv = codecs.open(csv_filepath, "a", "utf-8") csv.write("{},{},{}\n".format("attachment", filename, filesize)) csv.close() - print("File {} does not exist".format(filename)) + self.delete(f) elif re.match(r"[^\/]*\/exports\/[^\/]*\/[^\/]*\/.+", filename): #KC Export @@ -115,7 +115,7 @@ def handle(self, *args, **kwargs): csv = codecs.open(csv_filepath, "a", "utf-8") csv.write("{},{},{}\n".format("attachment", filename, filesize)) csv.close() - print("File {} does not exist".format(filename)) + self.delete(f) elif re.match(r"[^\/]*\/exports\/.+", filename): #KPI Export @@ -135,7 +135,7 @@ def handle(self, *args, **kwargs): csv = codecs.open(csv_filepath, "a", "utf-8") csv.write("{},{},{}\n".format("attachment", filename, filesize)) csv.close() - print("File {} does not exist".format(filename)) + self.delete(f) if time.time() - now >= 5 * 60: print("[{}] Still alive...".format(str(int(time.time())))) @@ -145,13 +145,21 @@ def handle(self, *args, **kwargs): print("ERROR - {}".format(str(e))) sys.exit() - print("Orphans: {}".format(orphans)) print("Size: {}".format(self.sizeof_fmt(size_to_reclaim))) + def delete(self, file_object): + try: + print("File {} does not exist in DB".format(file_object.name).encode('utf-8')) + file_object.delete() + except Exception as e: + print("ERROR - Could not delete file {} - Reason {}".format( + file_object.name, + str(e))) + def sizeof_fmt(self, num, suffix='B'): for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: if abs(num) < 1024.0: return "%3.1f%s%s" % (num, unit, suffix) num /= 1024.0 - return "%.1f%s%s" % (num, 'Yi', suffix) \ No newline at end of file + return "%.1f%s%s" % (num, 'Yi', suffix) From d441be6cb69323bb3af29e3a74243395713646ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Wed, 17 Apr 2019 11:31:04 -0400 Subject: [PATCH 04/12] Delete all files with their previous versions --- .../management/commands/remove_s3_orphans.py | 17 +++++++++-------- onadata/libs/utils/extended_s3boto_storage.py | 19 +++++++++++++++++++ 2 files changed, 28 insertions(+), 8 deletions(-) create mode 100644 onadata/libs/utils/extended_s3boto_storage.py diff --git a/onadata/apps/logger/management/commands/remove_s3_orphans.py b/onadata/apps/logger/management/commands/remove_s3_orphans.py index 6f7efdbce..0b02e287b 100644 --- a/onadata/apps/logger/management/commands/remove_s3_orphans.py +++ b/onadata/apps/logger/management/commands/remove_s3_orphans.py @@ -12,13 +12,12 @@ from django.db import connection from django.db.models import Value as V from django.db.models.functions import Concat -from django.utils.translation import ugettext as _, ugettext_lazy +from django.utils.translation import ugettext_lazy from onadata.apps.logger.models import Attachment from onadata.apps.viewer.models import Export # S3 Monkey Patch -import boto from boto import handler from boto.resultset import ResultSet from boto.s3.bucket import Bucket @@ -73,8 +72,8 @@ def handle(self, *args, **kwargs): Bucket._get_all = _get_all - s3 = get_storage_class('storages.backends.s3boto.S3BotoStorage')() - all_files = s3.bucket.list() + self._s3 = get_storage_class('onadata.libs.utils.extended_s3boto_storage.ExtendedS3BotoStorage')() + all_files = self._s3.bucket.list() size_to_reclaim = 0 orphans = 0 @@ -105,7 +104,7 @@ def handle(self, *args, **kwargs): self.delete(f) elif re.match(r"[^\/]*\/exports\/[^\/]*\/[^\/]*\/.+", filename): - #KC Export + # KC Export if not Export.objects.annotate(fullpath=Concat("filedir", V("/"), "filename"))\ .filter(fullpath=filename).exists(): @@ -118,7 +117,8 @@ def handle(self, *args, **kwargs): self.delete(f) elif re.match(r"[^\/]*\/exports\/.+", filename): - #KPI Export + # KPI Export. + # TODO Create the same command in KPI after merging `two-databases`. does_exist = False with connection.cursor() as cursor: cursor.execute("SELECT EXISTS(SELECT id FROM kpi_exporttask WHERE result = %s)", [filename]) @@ -151,13 +151,14 @@ def handle(self, *args, **kwargs): def delete(self, file_object): try: print("File {} does not exist in DB".format(file_object.name).encode('utf-8')) - file_object.delete() + self._s3.delete_all(file_object.name) except Exception as e: print("ERROR - Could not delete file {} - Reason {}".format( file_object.name, str(e))) - def sizeof_fmt(self, num, suffix='B'): + @staticmethod + def sizeof_fmt(num, suffix='B'): for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: if abs(num) < 1024.0: return "%3.1f%s%s" % (num, unit, suffix) diff --git a/onadata/libs/utils/extended_s3boto_storage.py b/onadata/libs/utils/extended_s3boto_storage.py new file mode 100644 index 000000000..0b37ed7e6 --- /dev/null +++ b/onadata/libs/utils/extended_s3boto_storage.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from storages.backends.s3boto import S3BotoStorage + + +class ExtendedS3BotoStorage(S3BotoStorage): + + def delete_all(self, name): + """ + Delete the key object and all its versions + :param name: str. S3 key (i.e. path to the file) + """ + name = self._normalize_name(self._clean_name(name)) + self.bucket.delete_key(self._encode_name(name)) + + # Delete all previous versions + for versioned_key in self.bucket.list_versions(prefix=name): + self.bucket.delete_key(versioned_key.name, version_id=versioned_key.version_id) From 208d82ed054d06d5141b01c0d4995ec4a962683b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Tue, 1 Oct 2019 16:14:11 -0400 Subject: [PATCH 05/12] Copy lock mechanism from kpi --- onadata/libs/utils/lock.py | 34 +++++++++++++++ onadata/libs/utils/redis_helper.py | 70 ++++++++++++++++++++++-------- onadata/settings/common.py | 4 +- 3 files changed, 88 insertions(+), 20 deletions(-) create mode 100644 onadata/libs/utils/lock.py diff --git a/onadata/libs/utils/lock.py b/onadata/libs/utils/lock.py new file mode 100644 index 000000000..eeb2c2b84 --- /dev/null +++ b/onadata/libs/utils/lock.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, unicode_literals + +import os +from functools import wraps + +import redis +from django.conf import settings + + +REDIS_LOCK_CLIENT = redis.Redis(**settings.LOCK_REDIS) + + +def lock(key='', timeout=None): + + def _lock(func): + @wraps(func) + def wrapper(*args, **kwargs): + ret_value = None + have_lock = False + prefix = os.getenv('REDIS_KOBOCAT_LOCK_PREFIX', 'kc-lock') + key_ = '{}:{}'.format(prefix, key) + lock_ = REDIS_LOCK_CLIENT.lock(key_, timeout=timeout) + try: + have_lock = lock_.acquire(blocking=False) + if have_lock: + ret_value = func(*args, **kwargs) + finally: + if have_lock: + lock_.release() + + return ret_value + return wrapper + return _lock diff --git a/onadata/libs/utils/redis_helper.py b/onadata/libs/utils/redis_helper.py index 27afa351a..154f64e10 100644 --- a/onadata/libs/utils/redis_helper.py +++ b/onadata/libs/utils/redis_helper.py @@ -4,6 +4,8 @@ import os import re +from django.core.exceptions import ImproperlyConfigured + class RedisHelper(object): """ @@ -14,26 +16,56 @@ class RedisHelper(object): """ @staticmethod - def config(default=None): + def config(url_variable, default=None): + """ + Parses `url_variable` environment variable to return a dict with + expected attributes for redis clients. + + :return: dict + """ + + redis_connection_url = os.getenv(url_variable, default) + match = re.match(r'redis://(:(?P[^@]*)@)?(?P[^:]+):(?P\d+)(/(?P\d+))?', + redis_connection_url) + if not match: + raise ImproperlyConfigured( + "Could not parse `{}`. Please verify your settings.".format( + url_variable) + ) + + redis_connection_dict = { + 'host': match.group('host'), + 'port': match.group('port'), + 'db': match.group('index') or 0, + 'password': match.group('password') + } + return redis_connection_dict + + @classmethod + def session_config(cls, default=None): """ + Parses `REDIS_SESSION_URL` environment variable to return a dict with + expected attributes for django redis session. + + :return: dict + """ + + redis_connection_dict = cls.config('REDIS_SESSION_URL', default) + redis_connection_dict.update({ + 'prefix': os.getenv('REDIS_SESSION_PREFIX', 'session'), + 'socket_timeout': os.getenv('REDIS_SESSION_SOCKET_TIMEOUT', 1), + }) + return redis_connection_dict + + @classmethod + def lock_config(cls, default=None): + """ + Parses `REDIS_LOCK_URL` environment variable to return a dict + for lock mechanism based on redis. + :return: dict """ - try: - redis_connection_url = os.getenv("REDIS_SESSION_URL", default) - match = re.match(r"redis://(:(?P[^@]*)@)?(?P[^:]+):(?P\d+)(/(?P\d+))?", - redis_connection_url) - if not match: - raise Exception() - - redis_connection_dict = { - "host": match.group("host"), - "port": match.group("port"), - "db": match.group("index") or 0, - "password": match.group("password"), - "prefix": os.getenv("REDIS_SESSION_PREFIX", "session"), - "socket_timeout": os.getenv("REDIS_SESSION_SOCKET_TIMEOUT", 1), - } - return redis_connection_dict - except Exception as e: - raise Exception("Could not parse Redis session URL. Please verify 'REDIS_SESSION_URL' value") + redis_connection_dict = cls.config('REDIS_LOCK_URL', default) + + return redis_connection_dict diff --git a/onadata/settings/common.py b/onadata/settings/common.py index 7d9a16f9d..275afcf13 100644 --- a/onadata/settings/common.py +++ b/onadata/settings/common.py @@ -562,4 +562,6 @@ def skip_suspicious_operations(record): USE_X_FORWARDED_HOST = True SESSION_ENGINE = "redis_sessions.session" -SESSION_REDIS = RedisHelper.config(default="redis://redis_cache:6380/2") +SESSION_REDIS = RedisHelper.session_config(default="redis://redis_cache:6380/2") + +LOCK_REDIS = RedisHelper.lock_config(default="redis://redis_cache:6380/3") From cf5c5f60be7910d67c64238305d8efb5a061f519 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Wed, 2 Oct 2019 16:01:11 -0400 Subject: [PATCH 06/12] Stantardize remove command names and add these commands as periodic tasks --- ...ns.py => remove_duplicated_submissions.py} | 8 +- ...elete_revisions.py => remove_revisions.py} | 2 +- .../management/commands/remove_s3_orphans.py | 83 +++++++++++-------- onadata/apps/logger/tasks.py | 50 +++++++---- onadata/settings/kc_environ.py | 15 ++++ 5 files changed, 102 insertions(+), 56 deletions(-) rename onadata/apps/logger/management/commands/{clean_duplicated_submissions.py => remove_duplicated_submissions.py} (97%) rename onadata/apps/logger/management/commands/{delete_revisions.py => remove_revisions.py} (98%) diff --git a/onadata/apps/logger/management/commands/clean_duplicated_submissions.py b/onadata/apps/logger/management/commands/remove_duplicated_submissions.py similarity index 97% rename from onadata/apps/logger/management/commands/clean_duplicated_submissions.py rename to onadata/apps/logger/management/commands/remove_duplicated_submissions.py index f73dbf132..f7a02e13b 100644 --- a/onadata/apps/logger/management/commands/clean_duplicated_submissions.py +++ b/onadata/apps/logger/management/commands/remove_duplicated_submissions.py @@ -17,11 +17,11 @@ class Command(BaseCommand): - help = "Deletes duplicated submissions (i.e same `uuid` and same `xml`)" + help = "Removes duplicated submissions (i.e same `uuid` and same `xml`)" def __init__(self, **kwargs): super(Command, self).__init__(**kwargs) - self.__vaccuum = False + self.__vacuum = False self.__users = set([]) def add_arguments(self, parser): @@ -110,7 +110,7 @@ def handle(self, *args, **options): duplicated_instance_ids, purge) - if not self.__vaccuum: + if not self.__vacuum: if purge: self.stdout.write('No instances have been purged.') else: @@ -130,7 +130,7 @@ def handle(self, *args, **options): def __clean_up(self, instance_id_ref, duplicated_instance_ids, purge): if instance_id_ref is not None and len(duplicated_instance_ids) > 0: - self.__vaccuum = True + self.__vacuum = True with transaction.atomic(): self.stdout.write('Link attachments to instance #{}'.format( instance_id_ref)) diff --git a/onadata/apps/logger/management/commands/delete_revisions.py b/onadata/apps/logger/management/commands/remove_revisions.py similarity index 98% rename from onadata/apps/logger/management/commands/delete_revisions.py rename to onadata/apps/logger/management/commands/remove_revisions.py index fa06972b1..8cb364693 100644 --- a/onadata/apps/logger/management/commands/delete_revisions.py +++ b/onadata/apps/logger/management/commands/remove_revisions.py @@ -12,7 +12,7 @@ class Command(RevisionCommand): - help = "Deletes revisions (by chunks) for a given app [and model]" + help = "Removes revisions (by chunks) for a given app [and model]" def add_arguments(self, parser): super(Command, self).add_arguments(parser) diff --git a/onadata/apps/logger/management/commands/remove_s3_orphans.py b/onadata/apps/logger/management/commands/remove_s3_orphans.py index 0b02e287b..25b53de69 100644 --- a/onadata/apps/logger/management/commands/remove_s3_orphans.py +++ b/onadata/apps/logger/management/commands/remove_s3_orphans.py @@ -7,12 +7,12 @@ import sys import time +from django.conf import settings from django.core.management.base import BaseCommand, CommandError from django.core.files.storage import get_storage_class -from django.db import connection from django.db.models import Value as V from django.db.models.functions import Concat -from django.utils.translation import ugettext_lazy +from django.utils.translation import ugettext as _ from onadata.apps.logger.models import Attachment from onadata.apps.viewer.models import Export @@ -66,22 +66,49 @@ def fix_bad_characters(str_): class Command(BaseCommand): - help = ugettext_lazy("Removes attachments orphans from S3") + help = _('Removes orphan files in S3') + + def add_arguments(self, parser): + super(Command, self).add_arguments(parser) + + parser.add_argument( + "--dry-run", + action='store_true', + default=False, + help="Do not delete files", + ) + + parser.add_argument( + "--log-files", + action='store_true', + default=True, + help="Save deleted files to a CSV", + ) def handle(self, *args, **kwargs): Bucket._get_all = _get_all + dry_run = kwargs['dry_run'] + log_files = kwargs['log_files'] + self._s3 = get_storage_class('onadata.libs.utils.extended_s3boto_storage.ExtendedS3BotoStorage')() all_files = self._s3.bucket.list() size_to_reclaim = 0 orphans = 0 now = time.time() - csv_filepath = "/srv/logs/orphans_files.csv" + csv_filepath = '/srv/logs/orphan_files-{}.csv'.format(int(now)) + + print('Bucket name: {}'.format(settings.AWS_STORAGE_BUCKET_NAME)) + if dry_run: + print('Dry run mode activated') + if log_files: + print('CSV: {}'.format(csv_filepath)) - with open(csv_filepath, "w") as csv: - csv.write("type,filename,filesize\n") + if log_files: + with open(csv_filepath, "w") as csv: + csv.write("type,filename,filesize\n") for f in all_files: try: @@ -98,10 +125,13 @@ def handle(self, *args, **kwargs): filesize = f.size orphans += 1 size_to_reclaim += filesize - csv = codecs.open(csv_filepath, "a", "utf-8") - csv.write("{},{},{}\n".format("attachment", filename, filesize)) - csv.close() - self.delete(f) + if log_files: + csv = codecs.open(csv_filepath, "a", "utf-8") + csv.write("{},{},{}\n".format("attachment", filename, filesize)) + csv.close() + + if not dry_run: + self.delete(f) elif re.match(r"[^\/]*\/exports\/[^\/]*\/[^\/]*\/.+", filename): # KC Export @@ -111,31 +141,12 @@ def handle(self, *args, **kwargs): filesize = f.size orphans += 1 size_to_reclaim += filesize - csv = codecs.open(csv_filepath, "a", "utf-8") - csv.write("{},{},{}\n".format("attachment", filename, filesize)) - csv.close() - self.delete(f) - - elif re.match(r"[^\/]*\/exports\/.+", filename): - # KPI Export. - # TODO Create the same command in KPI after merging `two-databases`. - does_exist = False - with connection.cursor() as cursor: - cursor.execute("SELECT EXISTS(SELECT id FROM kpi_exporttask WHERE result = %s)", [filename]) - try: - row = cursor.fetchone() - does_exist = row[0] - except: - pass - - if not does_exist: - filesize = f.size - orphans += 1 - size_to_reclaim += filesize - csv = codecs.open(csv_filepath, "a", "utf-8") - csv.write("{},{},{}\n".format("attachment", filename, filesize)) - csv.close() - self.delete(f) + if log_files: + csv = codecs.open(csv_filepath, "a", "utf-8") + csv.write("{},{},{}\n".format("export", filename, filesize)) + csv.close() + if not dry_run: + self.delete(f) if time.time() - now >= 5 * 60: print("[{}] Still alive...".format(str(int(time.time())))) @@ -143,7 +154,7 @@ def handle(self, *args, **kwargs): except Exception as e: print("ERROR - {}".format(str(e))) - sys.exit() + sys.exit(-1) print("Orphans: {}".format(orphans)) print("Size: {}".format(self.sizeof_fmt(size_to_reclaim))) diff --git a/onadata/apps/logger/tasks.py b/onadata/apps/logger/tasks.py index 567e7557f..a0229a20a 100644 --- a/onadata/apps/logger/tasks.py +++ b/onadata/apps/logger/tasks.py @@ -1,29 +1,33 @@ -### ISSUE 242 TEMPORARY FIX ### -# See https://github.com/kobotoolbox/kobocat/issues/242 + +import csv +import datetime +import zipfile +from collections import defaultdict +from io import BytesIO + from celery import shared_task +from dateutil import relativedelta +from django.contrib.auth.models import User +from django.core.files.storage import get_storage_class from django.core.management import call_command +from onadata.libs.utils.lock import lock +from .models import Instance, XForm + + +# ## ISSUE 242 TEMPORARY FIX ### +# See https://github.com/kobotoolbox/kobocat/issues/242 + @shared_task(soft_time_limit=600, time_limit=900) +@lock(key='fix_root_node_names', timeout=900) def fix_root_node_names(**kwargs): call_command( 'fix_root_node_names', **kwargs ) +# ##### END ISSUE 242 FIX ###### -###### END ISSUE 242 FIX ###### - -import csv -import zipfile -import datetime -from io import BytesIO -from dateutil import relativedelta -from collections import defaultdict - -from django.contrib.auth.models import User -from django.core.files.storage import get_storage_class - -from .models import Instance, XForm @shared_task def generate_stats_zip(output_filename): @@ -99,3 +103,19 @@ def list_created_by_month(model, date_field): csv_io.close() zip_file.close() + + +@shared_task +@lock(key='remove_s3_orphans', timeout=3600) +def remove_s3_orphans(): + call_command('remove_s3_orphans') + + +@shared_task +@lock(key='remove_revisions', timeout=3600) +def remove_revisions(): + # We can also use `keep=1` to keep at least + # on version of each object. + # e.g.: `call_command('remove_revisions', days=90, keep=1)` + call_command('remove_revisions', days=90) + diff --git a/onadata/settings/kc_environ.py b/onadata/settings/kc_environ.py index 1fa5ac96c..2d40a3077 100644 --- a/onadata/settings/kc_environ.py +++ b/onadata/settings/kc_environ.py @@ -3,6 +3,7 @@ import logging import os +from celery.schedules import crontab from celery.signals import after_setup_logger import dj_database_url @@ -273,6 +274,20 @@ def celery_logger_setup_handler(logger, **kwargs): 'schedule': timedelta(hours=6), 'options': {'queue': 'kobocat_queue'} }, + # Schedule every Saturday at 4:00 AM UTC. Can be customized in admin section + 'remove-s3-orphans': { + 'task': 'onadata.apps.logger.tasks.remove_s3_orphans', + 'schedule': crontab(hour=4, minute=0, day_of_week=6), + 'options': {'queue': 'kobocat_queue'}, + 'enabled': False, + }, + # Schedule every day at 5:00 AM UTC. Can be customized in admin section + 'remove-revisions': { + 'task': 'onadata.apps.logger.tasks.remove_revisions', + 'schedule': crontab(hour=4, minute=0), + 'options': {'queue': 'kobocat_queue'}, + 'enabled': False, + }, } # ## ISSUE 242 TEMPORARY FIX ### From 68269ecbb430ef9bad073e4c3a793a14336dee4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Fri, 4 Oct 2019 15:24:49 -0400 Subject: [PATCH 07/12] Force keep kpi revisions --- .../logger/management/commands/remove_revisions.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/onadata/apps/logger/management/commands/remove_revisions.py b/onadata/apps/logger/management/commands/remove_revisions.py index 8cb364693..7fa5f26bf 100644 --- a/onadata/apps/logger/management/commands/remove_revisions.py +++ b/onadata/apps/logger/management/commands/remove_revisions.py @@ -4,6 +4,7 @@ from datetime import timedelta import sys +from django.contrib.contenttypes.models import ContentType from django.db import transaction, models, router, connection from django.utils import timezone from reversion.models import Revision, Version @@ -56,6 +57,16 @@ def handle(self, *app_labels, **options): keep_revision_ids = set() # By default, delete nothing. can_delete = False + + asset_content_type = ContentType.objects.get(app_label='kpi', model='asset') + # Force keep assets' revisions even if `self.models()` returns only + # registered models. + keep_revision_ids.update(Version.objects.using(using).filter( + model_db=model_db, + content_type_id=asset_content_type). + values_list('revision_id', flat=True) + ) + # Get all revisions for the given revision manager and model. for model in self.get_models(options): if verbosity >= 1: From 921e8225f0fee81d1dbbe024ebe71fb2a55cd8b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20L=C3=A9ger?= Date: Mon, 7 Oct 2019 17:39:07 -0400 Subject: [PATCH 08/12] Do not delete reversion from Asset model --- .../management/commands/remove_revisions.py | 15 +++++---------- onadata/libs/utils/gravatar.py | 12 +++++++++--- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/onadata/apps/logger/management/commands/remove_revisions.py b/onadata/apps/logger/management/commands/remove_revisions.py index 7fa5f26bf..800860e11 100644 --- a/onadata/apps/logger/management/commands/remove_revisions.py +++ b/onadata/apps/logger/management/commands/remove_revisions.py @@ -4,7 +4,6 @@ from datetime import timedelta import sys -from django.contrib.contenttypes.models import ContentType from django.db import transaction, models, router, connection from django.utils import timezone from reversion.models import Revision, Version @@ -58,17 +57,13 @@ def handle(self, *app_labels, **options): # By default, delete nothing. can_delete = False - asset_content_type = ContentType.objects.get(app_label='kpi', model='asset') - # Force keep assets' revisions even if `self.models()` returns only - # registered models. - keep_revision_ids.update(Version.objects.using(using).filter( - model_db=model_db, - content_type_id=asset_content_type). - values_list('revision_id', flat=True) - ) - # Get all revisions for the given revision manager and model. for model in self.get_models(options): + # Force keep assets' revisions even if `self.models()` returns only + # registered models. + if model._meta.verbose_name == 'asset': + continue + if verbosity >= 1: self.stdout.write("Finding stale revisions for {name}".format( name=model._meta.verbose_name, diff --git a/onadata/libs/utils/gravatar.py b/onadata/libs/utils/gravatar.py index 4cffc8bb0..5e25ac612 100644 --- a/onadata/libs/utils/gravatar.py +++ b/onadata/libs/utils/gravatar.py @@ -1,5 +1,11 @@ +# coding: utf-8 +from __future__ import (unicode_literals, print_function, + absolute_import, division) + import hashlib -import urllib + +from django.utils.six.moves.urllib.parse import urlencode +from django.utils.six.moves.urllib.request import urlopen DEFAULT_GRAVATAR = "https://formhub.org/static/images/formhub_avatar.png" GRAVATAR_ENDPOINT = "https://secure.gravatar.com/avatar/" @@ -8,7 +14,7 @@ def get_gravatar_img_link(user): url = GRAVATAR_ENDPOINT +\ - hashlib.md5(user.email.lower()).hexdigest() + "?" + urllib.urlencode({ + hashlib.md5(user.email.lower()).hexdigest() + "?" + urlencode({ 'd': DEFAULT_GRAVATAR, 's': str(GRAVATAR_SIZE) }) return url @@ -17,5 +23,5 @@ def get_gravatar_img_link(user): def gravatar_exists(user): url = GRAVATAR_ENDPOINT +\ hashlib.md5(user.email.lower()).hexdigest() + "?" + "d=404" - exists = urllib.urlopen(url).getcode() != 404 + exists = urlopen(url).getcode() != 404 return exists From 9f010b3f8b0e683a78d48045f861e2b2f413d8fa Mon Sep 17 00:00:00 2001 From: Olivier Leger Date: Fri, 18 Feb 2022 17:25:00 -0500 Subject: [PATCH 09/12] Support Filesystem Storage --- .../management/commands/remove_s3_orphans.py | 177 -------------- .../commands/remove_storage_orphans.py | 229 ++++++++++++++++++ onadata/apps/logger/tasks.py | 14 +- onadata/apps/main/tests/test_form_exports.py | 1 - onadata/libs/utils/extended_s3boto_storage.py | 19 -- onadata/libs/utils/lock.py | 4 +- onadata/settings/base.py | 17 +- onadata/settings/dev.py | 12 - onadata/settings/prod.py | 4 - 9 files changed, 247 insertions(+), 230 deletions(-) delete mode 100644 onadata/apps/logger/management/commands/remove_s3_orphans.py create mode 100644 onadata/apps/logger/management/commands/remove_storage_orphans.py delete mode 100644 onadata/libs/utils/extended_s3boto_storage.py diff --git a/onadata/apps/logger/management/commands/remove_s3_orphans.py b/onadata/apps/logger/management/commands/remove_s3_orphans.py deleted file mode 100644 index 25b53de69..000000000 --- a/onadata/apps/logger/management/commands/remove_s3_orphans.py +++ /dev/null @@ -1,177 +0,0 @@ -#!/usr/bin/env python -# vim: ai ts=4 sts=4 et sw=4 coding=utf-8 -# -*- coding: utf-8 -*- -from __future__ import unicode_literals -import codecs -import re -import sys -import time - -from django.conf import settings -from django.core.management.base import BaseCommand, CommandError -from django.core.files.storage import get_storage_class -from django.db.models import Value as V -from django.db.models.functions import Concat -from django.utils.translation import ugettext as _ - -from onadata.apps.logger.models import Attachment -from onadata.apps.viewer.models import Export - -# S3 Monkey Patch -from boto import handler -from boto.resultset import ResultSet -from boto.s3.bucket import Bucket -import xml.sax -import xml.sax.saxutils - - -def _get_all(self, element_map, initial_query_string='', - headers=None, **params): - query_args = self._get_all_query_args( - params, - initial_query_string=initial_query_string - ) - response = self.connection.make_request('GET', self.name, - headers=headers, - query_args=query_args) - body = response.read() - - if response.status == 200: - rs = ResultSet(element_map) - h = handler.XmlHandler(rs, self) - try: - xml.sax.parseString(fix_bad_characters(body), h) - except Exception as e: - print("XML Parsing Error - {}".format(str(e))) - error_filename = "/srv/logs/s3_body_error-{}.xml".format(str(int(time.time()))) - with open(error_filename, "w") as xmlfile_error: - xmlfile_error.write("{}\n".format(str(e))) - xmlfile_error.write(body) - raise Exception(str(e)) - return rs - else: - raise self.connection.provider.storage_response_error( - response.status, response.reason, body) - - -def fix_bad_characters(str_): - - try: - str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", str_) - except Exception as e: - # Try to force unicode - str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", unicode(str_, "utf-8")) - str_ = str_.encode("utf-8") - return str_ - - -class Command(BaseCommand): - help = _('Removes orphan files in S3') - - def add_arguments(self, parser): - super(Command, self).add_arguments(parser) - - parser.add_argument( - "--dry-run", - action='store_true', - default=False, - help="Do not delete files", - ) - - parser.add_argument( - "--log-files", - action='store_true', - default=True, - help="Save deleted files to a CSV", - ) - - def handle(self, *args, **kwargs): - - Bucket._get_all = _get_all - - dry_run = kwargs['dry_run'] - log_files = kwargs['log_files'] - - self._s3 = get_storage_class('onadata.libs.utils.extended_s3boto_storage.ExtendedS3BotoStorage')() - all_files = self._s3.bucket.list() - size_to_reclaim = 0 - orphans = 0 - - now = time.time() - csv_filepath = '/srv/logs/orphan_files-{}.csv'.format(int(now)) - - print('Bucket name: {}'.format(settings.AWS_STORAGE_BUCKET_NAME)) - if dry_run: - print('Dry run mode activated') - if log_files: - print('CSV: {}'.format(csv_filepath)) - - if log_files: - with open(csv_filepath, "w") as csv: - csv.write("type,filename,filesize\n") - - for f in all_files: - try: - filename = f.name - if filename[-1] != "/": - if re.match(r"[^\/]*\/attachments\/[^\/]*\/[^\/]*\/.+", filename): - clean_filename = filename - for auto_suffix in ["-large", "-medium", "-small"]: - if filename[-(len(auto_suffix) + 4):-4] == auto_suffix: - clean_filename = filename[:-(len(auto_suffix) + 4)] + filename[-4:] - break - - if not Attachment.objects.filter(media_file=clean_filename).exists(): - filesize = f.size - orphans += 1 - size_to_reclaim += filesize - if log_files: - csv = codecs.open(csv_filepath, "a", "utf-8") - csv.write("{},{},{}\n".format("attachment", filename, filesize)) - csv.close() - - if not dry_run: - self.delete(f) - - elif re.match(r"[^\/]*\/exports\/[^\/]*\/[^\/]*\/.+", filename): - # KC Export - if not Export.objects.annotate(fullpath=Concat("filedir", - V("/"), "filename"))\ - .filter(fullpath=filename).exists(): - filesize = f.size - orphans += 1 - size_to_reclaim += filesize - if log_files: - csv = codecs.open(csv_filepath, "a", "utf-8") - csv.write("{},{},{}\n".format("export", filename, filesize)) - csv.close() - if not dry_run: - self.delete(f) - - if time.time() - now >= 5 * 60: - print("[{}] Still alive...".format(str(int(time.time())))) - now = time.time() - - except Exception as e: - print("ERROR - {}".format(str(e))) - sys.exit(-1) - - print("Orphans: {}".format(orphans)) - print("Size: {}".format(self.sizeof_fmt(size_to_reclaim))) - - def delete(self, file_object): - try: - print("File {} does not exist in DB".format(file_object.name).encode('utf-8')) - self._s3.delete_all(file_object.name) - except Exception as e: - print("ERROR - Could not delete file {} - Reason {}".format( - file_object.name, - str(e))) - - @staticmethod - def sizeof_fmt(num, suffix='B'): - for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: - if abs(num) < 1024.0: - return "%3.1f%s%s" % (num, unit, suffix) - num /= 1024.0 - return "%.1f%s%s" % (num, 'Yi', suffix) diff --git a/onadata/apps/logger/management/commands/remove_storage_orphans.py b/onadata/apps/logger/management/commands/remove_storage_orphans.py new file mode 100644 index 000000000..89e9c49b0 --- /dev/null +++ b/onadata/apps/logger/management/commands/remove_storage_orphans.py @@ -0,0 +1,229 @@ +# coding: utf-8 +import csv +import os +import re +import sys +import time + +import boto3 +from django.conf import settings +from django.core.management.base import BaseCommand +from django.core.files.storage import get_storage_class, FileSystemStorage +from django.db.models import Value as V +from django.db.models.functions import Concat + +from onadata.apps.logger.models import Attachment +from onadata.apps.viewer.models import Export + + +class Command(BaseCommand): + help = 'Removes orphan files on storage' + args = '[username]' + + def __init__( + self, stdout=None, stderr=None, no_color=False, force_color=False + ): + super().__init__(stdout, stderr, no_color, force_color) + self._orphans = 0 + self._size_to_reclaim = 0 + self._csv_filepath = '/srv/logs/orphan_files-{}.csv'.format( + int(time.time()) + ) + + def add_arguments(self, parser): + super(Command, self).add_arguments(parser) + + parser.add_argument('username', nargs='?', default=None) + + parser.add_argument( + '--dry-run', + action='store_true', + default=False, + help='Do not delete files', + ) + + parser.add_argument( + '--save-as-csv', + action='store_true', + default=False, + help='Save deleted files to a CSV file', + ) + + parser.add_argument( + '--calculate-size', + action='store_true', + default=False, + help=( + 'Calculate total size reclaimed on storage.\n' + 'Warning, it produces lots of `HEAD` (billed) requests to AWS S3' + ) + ) + + def handle(self, *args, **options): + + dry_run = options['dry_run'] + save_as_csv = options['save_as_csv'] + calculate_size = options['calculate_size'] + username = options['username'] + + self._storage_manager = StorageManager(username, calculate_size) + all_files = self._storage_manager.get_files() + + if dry_run: + self.stdout.write('Dry run mode activated') + + if save_as_csv: + with open(self._csv_filepath, 'w', newline='') as csvfile: + writer = csv.DictWriter( + csvfile, fieldnames=['type', 'filepath', 'filesize'] + ) + writer.writeheader() + + for absolute_filepath in all_files: + try: + if not absolute_filepath.endswith('/'): + filepath = self._storage_manager.get_path_from_storage( + absolute_filepath + ) + if re.match(r'[^\/]*\/attachments\/[^\/]*\/[^\/]*\/.+', filepath): + clean_filepath = filepath + for auto_suffix in ['-large', '-medium', '-small']: + filename, extension = os.path.splitext( + os.path.basename(filepath) + ) + # Find real name saved in DB + if filename[-len(auto_suffix):] == auto_suffix: + clean_filepath = ( + filepath[:-(len(auto_suffix) + len(extension))] + + extension + ) + break + + if not Attachment.objects.filter( + media_file=clean_filepath + ).exists(): + self.delete('attachment', absolute_filepath, options) + + elif re.match(r'[^\/]*\/exports\/[^\/]*\/[^\/]*\/.+', filepath): + # KoBoCAT exports + if ( + not Export.objects.annotate( + fullpath=Concat('filedir', V('/'), 'filename') + ) + .filter(fullpath=filepath) + .exists() + ): + self.delete('export', absolute_filepath, options) + + except Exception as e: + self.stderr.write(f'ERROR - {str(e)}') + sys.exit(1) + + self.stdout.write(f'Orphans: {self._orphans}') + if calculate_size: + self.stdout.write(f'Free up space: {self.sizeof_fmt(self._size_to_reclaim)}') + if save_as_csv: + self.stdout.write(f'CSV saved at {self._csv_filepath}') + + def delete(self, orphan_type: str, absolute_filepath: str, options: dict): + + # Get size of the file + filesize = self._storage_manager.get_filesize(absolute_filepath) + filepath = self._storage_manager.get_path_from_storage(absolute_filepath) + self._orphans += 1 + self._size_to_reclaim += filesize + + if options['save_as_csv']: + with open(self._csv_filepath, 'a') as csvfile: + writer = csv.writer(csvfile) + writer.writerow([orphan_type, filepath, filesize]) + + if options['verbosity'] > 1: + self.stdout.write( + f'Found {orphan_type}: {filepath} - {self.sizeof_fmt(filesize)}' + ) + + if options['dry_run']: + return + + try: + self._storage_manager.delete(absolute_filepath) + if options['verbosity'] > 1: + self.stdout.write('\tDeleted!') + + except Exception as e: + self.stderr.write( + f'ERROR - Could not delete file {filepath} - Reason {str(e)}' + ) + + @staticmethod + def sizeof_fmt(num, suffix='B'): + for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: + if abs(num) < 1024.0: + return "%3.1f%s%s" % (num, unit, suffix) + num /= 1024.0 + return "%.1f%s%s" % (num, 'Yi', suffix) + + +class StorageManager: + + def __init__(self, username: str, calculate_size: bool): + self._calculate_size = calculate_size + self._username = username + self._storage = get_storage_class()() + self._is_local = isinstance(self._storage, FileSystemStorage) + if not self._is_local: + self._s3_client = boto3.client('s3') + + def delete(self, absolute_filepath: str): + if self._is_local: + os.remove(absolute_filepath) + else: + # Be aware, it does not delete all versions of the file. + # It relies on S3 LifeCyle rules to delete old versions. + self._s3_client.Object( + settings.AWS_STORAGE_BUCKET_NAME, absolute_filepath + ).delete() + + def get_files(self): + if self._is_local: + dest = ( + f'{settings.MEDIA_ROOT}/{self._username}' + if self._username + else settings.MEDIA_ROOT + ) + for root, dirs, files in os.walk(dest): + for name in files: + yield os.path.join(root, name) + else: + s3_paginator = self._s3_client.get_paginator('list_objects_v2') + bucket_name = settings.AWS_STORAGE_BUCKET_NAME + prefix = self._username if self._username else '' + for page in s3_paginator.paginate( + Bucket=bucket_name, Prefix=prefix, StartAfter='' + ): + for content in page.get('Contents', ()): + yield content['Key'] + + def get_filesize(self, absolute_filepath: str): + if not self._calculate_size: + return 0 + + if self._is_local: + return os.path.getsize(absolute_filepath) + else: + bucket_name = settings.AWS_STORAGE_BUCKET_NAME + response = self._s3_client.head_object( + Bucket=bucket_name, Key=absolute_filepath + ) + return response['ContentLength'] + + def get_path_from_storage(self, absolute_filepath: str) -> str: + if self._is_local: + return absolute_filepath.replace(settings.MEDIA_ROOT, '') + else: + return absolute_filepath + + @property + def storage(self): + return self._storage diff --git a/onadata/apps/logger/tasks.py b/onadata/apps/logger/tasks.py index fd37318bc..75a26029e 100644 --- a/onadata/apps/logger/tasks.py +++ b/onadata/apps/logger/tasks.py @@ -113,15 +113,15 @@ def list_created_by_month(model, date_field): @shared_task -@lock(key='remove_s3_orphans', timeout=3600) -def remove_s3_orphans(): - call_command('remove_s3_orphans') - - -@shared_task -@lock(key='remove_revisions', timeout=3600) +@lock(key='remove_revisions', timeout=604800) # Lock for one week def remove_revisions(): # We can also use `keep=1` to keep at least # on version of each object. # e.g.: `call_command('remove_revisions', days=90, keep=1)` call_command('remove_revisions', days=90) + + +@shared_task +@lock(key='remove_storage_orphans', timeout=604800) # Lock for one week +def remove_storage_orphans(): + call_command('remove_storage_orphans') diff --git a/onadata/apps/main/tests/test_form_exports.py b/onadata/apps/main/tests/test_form_exports.py index 383579714..0f00f9f42 100644 --- a/onadata/apps/main/tests/test_form_exports.py +++ b/onadata/apps/main/tests/test_form_exports.py @@ -2,7 +2,6 @@ import os import time import csv -import tempfile from django.urls import reverse from django.core.files.storage import get_storage_class, FileSystemStorage diff --git a/onadata/libs/utils/extended_s3boto_storage.py b/onadata/libs/utils/extended_s3boto_storage.py deleted file mode 100644 index 0b37ed7e6..000000000 --- a/onadata/libs/utils/extended_s3boto_storage.py +++ /dev/null @@ -1,19 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import unicode_literals - -from storages.backends.s3boto import S3BotoStorage - - -class ExtendedS3BotoStorage(S3BotoStorage): - - def delete_all(self, name): - """ - Delete the key object and all its versions - :param name: str. S3 key (i.e. path to the file) - """ - name = self._normalize_name(self._clean_name(name)) - self.bucket.delete_key(self._encode_name(name)) - - # Delete all previous versions - for versioned_key in self.bucket.list_versions(prefix=name): - self.bucket.delete_key(versioned_key.name, version_id=versioned_key.version_id) diff --git a/onadata/libs/utils/lock.py b/onadata/libs/utils/lock.py index eeb2c2b84..1970c2973 100644 --- a/onadata/libs/utils/lock.py +++ b/onadata/libs/utils/lock.py @@ -1,6 +1,4 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, unicode_literals - +# coding: utf-8 import os from functools import wraps diff --git a/onadata/settings/base.py b/onadata/settings/base.py index 5a511d535..391db1393 100644 --- a/onadata/settings/base.py +++ b/onadata/settings/base.py @@ -7,6 +7,7 @@ from urllib.parse import quote_plus import dj_database_url +from celery.schedules import crontab from django.core.exceptions import SuspiciousOperation from pymongo import MongoClient @@ -361,6 +362,8 @@ def skip_suspicious_operations(record): if not os.path.isdir(EMAIL_FILE_PATH): os.mkdir(EMAIL_FILE_PATH) +SESSION_ENGINE = 'redis_sessions.session' +SESSION_REDIS = RedisHelper.session_config(default='redis://redis_cache:6380/2') ################################### # Django Rest Framework settings # @@ -632,13 +635,6 @@ def skip_suspicious_operations(record): 'schedule': timedelta(hours=6), 'options': {'queue': 'kobocat_queue'} }, - # Schedule every Saturday at 4:00 AM UTC. Can be customized in admin section - 'remove-s3-orphans': { - 'task': 'onadata.apps.logger.tasks.remove_s3_orphans', - 'schedule': crontab(hour=4, minute=0, day_of_week=6), - 'options': {'queue': 'kobocat_queue'}, - 'enabled': False, - }, # Schedule every day at 5:00 AM UTC. Can be customized in admin section 'remove-revisions': { 'task': 'onadata.apps.logger.tasks.remove_revisions', @@ -646,6 +642,13 @@ def skip_suspicious_operations(record): 'options': {'queue': 'kobocat_queue'}, 'enabled': False, }, + # Schedule every Saturday at 4:00 AM UTC. Can be customized in admin section + 'remove-storage-orphans': { + 'task': 'onadata.apps.logger.tasks.remove_storage_orphans', + 'schedule': crontab(hour=4, minute=0, day_of_week=6), + 'options': {'queue': 'kobocat_queue'}, + 'enabled': False, + }, } CELERY_TASK_DEFAULT_QUEUE = 'kobocat_queue' diff --git a/onadata/settings/dev.py b/onadata/settings/dev.py index 154a64276..4de9a4480 100644 --- a/onadata/settings/dev.py +++ b/onadata/settings/dev.py @@ -2,18 +2,6 @@ from onadata.libs.utils.redis_helper import RedisHelper from .base import * -################################ -# Django Framework settings # -################################ - -LOGGING['root'] = { - 'handlers': ['console'], - 'level': 'DEBUG' -} - -SESSION_ENGINE = "redis_sessions.session" -SESSION_REDIS = RedisHelper.config(default="redis://redis_cache:6380/2") - ################################ # KoBoCAT settings # ################################ diff --git a/onadata/settings/prod.py b/onadata/settings/prod.py index e6384a998..ff46df15c 100644 --- a/onadata/settings/prod.py +++ b/onadata/settings/prod.py @@ -1,5 +1,4 @@ # coding: utf-8 -from onadata.libs.utils.redis_helper import RedisHelper from .base import * ################################ @@ -9,6 +8,3 @@ # Force `DEBUG` and `TEMPLATE_DEBUG` to `False` DEBUG = False TEMPLATES[0]['OPTIONS']['debug'] = False - -SESSION_ENGINE = "redis_sessions.session" -SESSION_REDIS = RedisHelper.config(default="redis://redis_cache:6380/2") From 3508cff85b640b9bd0cdea2045de599db0f4c575 Mon Sep 17 00:00:00 2001 From: Olivier Leger Date: Thu, 10 Mar 2022 11:26:56 -0500 Subject: [PATCH 10/12] utc-timezone --- onadata/settings/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/onadata/settings/base.py b/onadata/settings/base.py index b0efeac6a..ced14e34c 100644 --- a/onadata/settings/base.py +++ b/onadata/settings/base.py @@ -59,7 +59,7 @@ def skip_suspicious_operations(record): # timezone as the operating system. # If running in a Windows environment this must be set to the same as your # system time zone. -TIME_ZONE = 'America/New_York' +TIME_ZONE = 'UTC' USE_TZ = True # Language code for this installation. All choices can be found here: From 2aeccfc2c0bc4c862ed6eac6bc94e355f72f1bda Mon Sep 17 00:00:00 2001 From: Olivier Leger Date: Thu, 10 Mar 2022 11:26:56 -0500 Subject: [PATCH 11/12] Use UTC as timezone --- onadata/settings/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/onadata/settings/base.py b/onadata/settings/base.py index b0efeac6a..ced14e34c 100644 --- a/onadata/settings/base.py +++ b/onadata/settings/base.py @@ -59,7 +59,7 @@ def skip_suspicious_operations(record): # timezone as the operating system. # If running in a Windows environment this must be set to the same as your # system time zone. -TIME_ZONE = 'America/New_York' +TIME_ZONE = 'UTC' USE_TZ = True # Language code for this installation. All choices can be found here: From 0cf1c28d9fe7ed44b3119ea2c0fd7fc98e5c9c62 Mon Sep 17 00:00:00 2001 From: Olivier Leger Date: Thu, 10 Mar 2022 14:37:57 -0500 Subject: [PATCH 12/12] Add customizable session prefix for redis, move settings to base --- onadata/settings/base.py | 11 +++++++++++ onadata/settings/dev.py | 9 --------- onadata/settings/prod.py | 5 ----- onadata/settings/testing.py | 2 ++ 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/onadata/settings/base.py b/onadata/settings/base.py index b95adaf7c..3e2939c2c 100644 --- a/onadata/settings/base.py +++ b/onadata/settings/base.py @@ -13,6 +13,7 @@ env = environ.Env() + def skip_suspicious_operations(record): """Prevent django from sending 500 error email notifications for SuspiciousOperation @@ -379,6 +380,16 @@ def skip_suspicious_operations(record): if not os.path.isdir(EMAIL_FILE_PATH): os.mkdir(EMAIL_FILE_PATH) +SESSION_ENGINE = 'redis_sessions.session' +# django-redis-session expects a dictionary with `url` +redis_session_url = env.cache_url( + 'REDIS_SESSION_URL', default='redis://redis_cache:6380/2' +) +SESSION_REDIS = { + 'url': redis_session_url['LOCATION'], + 'prefix': env.str('REDIS_SESSION_PREFIX', 'session'), + 'socket_timeout': env.int('REDIS_SESSION_SOCKET_TIMEOUT', 1), +} ################################### # Django Rest Framework settings # diff --git a/onadata/settings/dev.py b/onadata/settings/dev.py index 59830cde1..d47e84d5e 100644 --- a/onadata/settings/dev.py +++ b/onadata/settings/dev.py @@ -1,15 +1,6 @@ # coding: utf-8 from .base import * -################################ -# Django Framework settings # -################################ - -SESSION_ENGINE = 'redis_sessions.session' -# django-redis-session expects a dictionary with `url` -redis_session_url = env.cache_url('REDIS_SESSION_URL', default='redis://redis_cache:6380/2') -SESSION_REDIS = {'url': redis_session_url['LOCATION']} - ################################ # KoBoCAT settings # ################################ diff --git a/onadata/settings/prod.py b/onadata/settings/prod.py index 16b7649f8..ff46df15c 100644 --- a/onadata/settings/prod.py +++ b/onadata/settings/prod.py @@ -8,8 +8,3 @@ # Force `DEBUG` and `TEMPLATE_DEBUG` to `False` DEBUG = False TEMPLATES[0]['OPTIONS']['debug'] = False - -SESSION_ENGINE = 'redis_sessions.session' -# django-redis-session expects a dictionary with `url` -redis_session_url = env.cache_url('REDIS_SESSION_URL', default='redis://redis_cache:6380/2') -SESSION_REDIS = {'url': redis_session_url['LOCATION']} \ No newline at end of file diff --git a/onadata/settings/testing.py b/onadata/settings/testing.py index 28184aebc..70f8c057a 100644 --- a/onadata/settings/testing.py +++ b/onadata/settings/testing.py @@ -23,6 +23,8 @@ SECRET_KEY = os.urandom(50).hex() +SESSION_ENGINE = 'django.contrib.sessions.backends.db' + ################################### # Django Rest Framework settings # ###################################