Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cleanup jobs as Celery periodic tasks #2434

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kobo/apps/hook/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from django.utils import translation, timezone
from django_celery_beat.models import PeriodicTask

from kpi.utils.lock import lock
from kpi.utils.log import logging
from .constants import HOOK_LOG_FAILED
from .models import Hook, HookLog
Expand Down Expand Up @@ -56,6 +57,7 @@ def retry_all_task(hooklogs_ids):


@shared_task
@lock('failure_reports', timeout=600)
def failures_reports():
"""
Notifies owners' assets by email of hooks failures.
Expand Down
36 changes: 31 additions & 5 deletions kobo/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,32 @@ def __init__(self, *args, **kwargs):

CELERY_BEAT_SCHEDULE = {
# Schedule every day at midnight UTC. Can be customized in admin section
"send-hooks-failures-reports": {
"task": "kobo.apps.hook.tasks.failures_reports",
"schedule": crontab(hour=0, minute=0),
'options': {'queue': 'kpi_queue'}
'send-hooks-failures-reports': {
'task': 'kobo.apps.hook.tasks.failures_reports',
'schedule': crontab(hour=0, minute=0),
'options': {'queue': 'kpi_queue'},
'enabled': True,
},
# Schedule every Saturday at 4:00 AM UTC. Can be customized in admin section
'remove-s3-orphans': {
'task': 'kpi.tasks.remove_s3_orphans',
'schedule': crontab(hour=4, minute=0, day_of_week=6),
'options': {'queue': 'kpi_queue'},
'enabled': False,
},
# Schedule every Friday at 4:00 AM UTC. Can be customized in admin section
'remove-asset-snapshots': {
'task': 'kpi.tasks.remove_asset_snapshots',
'schedule': crontab(hour=4, minute=0, day_of_week=5),
'options': {'queue': 'kpi_queue'},
'enabled': False,
},
# Schedule every Friday at 5:00 AM UTC. Can be customized in admin section
'remove-import-tasks': {
'task': 'kpi.tasks.remove_import_tasks',
'schedule': crontab(hour=5, minute=0, day_of_week=5),
'options': {'queue': 'kpi_queue'},
'enabled': False,
},
}

Expand Down Expand Up @@ -677,4 +699,8 @@ def __init__(self, *args, **kwargs):
MONGO_DB = MONGO_CONNECTION[MONGO_DATABASE['NAME']]

SESSION_ENGINE = "redis_sessions.session"
SESSION_REDIS = RedisHelper.config(default="redis://redis_cache:6380/2")
SESSION_REDIS = RedisHelper.session_config(default="redis://redis_cache:6380/2")

LOCK_REDIS = RedisHelper.lock_config(default="redis://redis_cache:6380/3")

TESTING = False
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from django.utils import timezone

from kpi.models import AssetSnapshot
from .delete_base_command import DeleteBaseCommand
from .remove_base_command import RemoveBaseCommand


class Command(DeleteBaseCommand):
class Command(RemoveBaseCommand):

help = "Deletes assets snapshots"
help = "Removes asset snapshots"

def _prepare_delete_queryset(self, **options):
days = options["days"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import sys

from django.core.management.base import BaseCommand
from django.utils import timezone
from django.db import transaction, connection


# TODO: Remove `delete_base_command` from the output of `./manage.py --help`
# TODO: Remove `remove_base_command` from the output of `./manage.py --help`
# or print an informative message if someone tries to use it. Currently,
# it just raises `AttributeError: 'module' object has no attribute 'Command'`


class DeleteBaseCommand(BaseCommand):
class RemoveBaseCommand(BaseCommand):

def __init__(self, stdout=None, stderr=None, no_color=False):
super().__init__(stdout=stdout, stderr=stderr, no_color=no_color)
Expand Down Expand Up @@ -46,6 +46,13 @@ def add_arguments(self, parser):
help="Run `VACUUM FULL` instead of `VACUUM`.",
)

parser.add_argument(
"--dry-run",
action='store_true',
default=False,
help="Print out what will be deleted without deleting it",
)

def handle(self, *args, **options):

chunks = options["chunks"]
Expand All @@ -61,6 +68,21 @@ def handle(self, *args, **options):
chunks_counter = 1
total = delete_queryset.count()

if options["dry_run"]:
try:
first = delete_queryset.order_by('date_created').first()
if first:
days_ago = '. Oldest is {} days'.format(
(timezone.now() - first.date_created).days,
)
else:
days_ago = ''
self.stdout.write("{} items to delete{}".format(total, days_ago))
except Exception as err:
pass

return

for record_id in delete_queryset.values_list("id", flat=True).iterator():

chunked_delete_ids.append(record_id)
Expand All @@ -81,22 +103,22 @@ def handle(self, *args, **options):
chunks_counter += 1

# Print new line
print("")
self.stdout.write("")

if vacuum is True or vacuum_full is True:
self._do_vacuum(vacuum_full)

print("Done!")
self.stdout.write("Done!")

def _prepare_delete_queryset(self, **options):
raise Exception("Must be implemented in child class")

def _do_vacuum(self, full=False):
cursor = connection.cursor()
if full:
print("Vacuuming (full) table {}...".format(self._model._meta.db_table))
self.stdout.write("Vacuuming (full) table {}...".format(self._model._meta.db_table))
cursor.execute("VACUUM FULL {}".format(self._model._meta.db_table))
else:
print("Vacuuming table {}...".format(self._model._meta.db_table))
self.stdout.write("Vacuuming table {}...".format(self._model._meta.db_table))
cursor.execute("VACUUM {}".format(self._model._meta.db_table))
connection.commit()
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

from django.utils import timezone

from .delete_base_command import DeleteBaseCommand
from .remove_base_command import RemoveBaseCommand
from kpi.models import ImportTask


class Command(DeleteBaseCommand):
class Command(RemoveBaseCommand):

help = "Deletes import tasks"
help = "Removes import tasks"

def _prepare_delete_queryset(self, **options):
days = options["days"]
Expand Down
170 changes: 170 additions & 0 deletions kpi/management/commands/remove_s3_orphans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#!/usr/bin/env python
# vim: ai ts=4 sts=4 et sw=4 coding=utf-8
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import codecs
import re
import sys
import time

from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.core.files.storage import get_storage_class
from django.utils.translation import ugettext as _

from kpi.models.import_export_task import ExportTask


# S3 Monkey Patch
from boto import handler
from boto.resultset import ResultSet
from boto.s3.bucket import Bucket
import xml.sax
import xml.sax.saxutils


def _get_all(self, element_map, initial_query_string='',
noliveleger marked this conversation as resolved.
Show resolved Hide resolved
headers=None, **params):
"""
The purpose of this method is to be used to monkey-patch
`boto.s3.bucket.Bucket._get_all()`. The original doesn't handle
correctly bad characters and crashes because of `xml.sax.parseString`
which can't parse `body` as valid `XML`.
"""
query_args = self._get_all_query_args(
params,
initial_query_string=initial_query_string
)
response = self.connection.make_request('GET', self.name,
headers=headers,
query_args=query_args)
body = response.read()

if response.status == 200:
rs = ResultSet(element_map)
h = handler.XmlHandler(rs, self)
try:
xml.sax.parseString(fix_bad_characters(body), h)
except Exception as e:
self.stdout.write("XML Parsing Error - {}".format(str(e)))
error_filename = "/srv/logs/s3_body_error-{}.xml".format(str(int(time.time())))
with open(error_filename, "w") as xmlfile_error:
xmlfile_error.write("{}\n".format(str(e)))
xmlfile_error.write(body)
raise Exception(str(e))
return rs
else:
raise self.connection.provider.storage_response_error(
response.status, response.reason, body)


def fix_bad_characters(str_):
"""
Replace unknown/bad characters `&...;` with `&...;`.
Except `'`, `", `<`, `>` and `&`
Example:
`&foo;` becomes `&foo;` but `<` stays `<`
"""
try:
str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", str_)
dorey marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
# Try to force unicode
str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", unicode(str_, "utf-8"))
str_ = str_.encode("utf-8")
return str_


class Command(BaseCommand):
help = _('Removes orphan files in S3')

def add_arguments(self, parser):
super(Command, self).add_arguments(parser)

parser.add_argument(
"--dry-run",
action='store_true',
default=False,
help="Do not delete files",
)

parser.add_argument(
"--log-files",
action='store_true',
default=True,
help="Save deleted files to a CSV",
)

def handle(self, *args, **kwargs):

Bucket._get_all = _get_all

dry_run = kwargs['dry_run']
log_files = kwargs['log_files']

self._s3 = get_storage_class('kpi.utils.extended_s3boto_storage.ExtendedS3BotoStorage')()
all_files = self._s3.bucket.list()
size_to_reclaim = 0
orphans = 0

now = time.time()
csv_filepath = '/srv/logs/orphan_files-{}.csv'.format(int(now))

if not settings.AWS_STORAGE_BUCKET_NAME:
self.stdout.write('`AWS_STORAGE_BUCKET_NAME` is not set. '
'Please check your settings')
sys.exit(-1)

self.stdout.write('Bucket name: {}'.format(settings.AWS_STORAGE_BUCKET_NAME))
if dry_run:
self.stdout.write('Dry run mode activated')
if log_files:
self.stdout.write('CSV: {}'.format(csv_filepath))

if log_files:
with open(csv_filepath, "w") as csv:
csv.write("type,filename,filesize\n")

for f in all_files:
try:
filename = f.name
if filename[-1] != "/":
# KPI Exports
if re.match(r"[^\/]*\/exports\/.+", filename):
if not ExportTask.objects.filter(result=filename).exists():
filesize = f.size
orphans += 1
size_to_reclaim += filesize
if log_files:
csv = codecs.open(csv_filepath, "a", "utf-8")
csv.write("{},{},{}\n".format("exports", filename, filesize))
csv.close()
if not dry_run:
self.delete(f)

if time.time() - now >= 5 * 60:
self.stdout.write("[{}] Still alive...".format(str(int(time.time()))))
now = time.time()

except Exception as e:
self.stdout.write("ERROR - {}".format(str(e)))
sys.exit(-1)

self.stdout.write("Orphans: {}".format(orphans))
self.stdout.write("Size: {}".format(self.sizeof_fmt(size_to_reclaim)))

def delete(self, file_object):
try:
self.stdout.write("File {} does not exist in DB".format(file_object.name).encode('utf-8'))
self._s3.delete_all(file_object.name)
dorey marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
self.stdout.write("ERROR - Could not delete file {} - Reason {}".format(
file_object.name,
str(e)))

@staticmethod
def sizeof_fmt(num, suffix='B'):
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)
24 changes: 17 additions & 7 deletions kpi/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@
from django.core.management import call_command

from kpi.models import ImportTask, ExportTask


@shared_task
def update_search_index():
call_command('update_index', using=['default',], remove=True)
from kpi.utils.lock import lock


@shared_task
Expand All @@ -23,10 +19,24 @@ def export_in_background(export_task_uid):


@shared_task
@lock(key='sync_kobocat_xforms', timeout=3600 * 24)
def sync_kobocat_xforms(username=None, quiet=True):
call_command('sync_kobocat_xforms', username=username, quiet=quiet)


@shared_task
def import_survey_drafts_from_dkobo(**kwargs):
call_command('import_survey_drafts_from_dkobo', **kwargs)
@lock(key='remove_s3_orphans', timeout=3600)
def remove_s3_orphans():
call_command('remove_s3_orphans')


@shared_task
@lock(key='remove_asset_snapshots', timeout=3600)
def remove_asset_snapshots():
call_command('remove_asset_snapshots')


@shared_task
@lock(key='remove_import_tasks', timeout=3600)
def remove_import_tasks():
call_command('remove_import_tasks')
Loading