Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cleanup jobs as Celery periodic tasks #2434

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kobo/apps/hook/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from django.utils import translation, timezone
from django_celery_beat.models import PeriodicTask

from kpi.utils.lock import lock
from kpi.utils.log import logging
from .constants import HOOK_LOG_FAILED
from .models import Hook, HookLog
Expand Down Expand Up @@ -59,6 +60,7 @@ def retry_all_task(hooklogs_ids):


@shared_task
@lock('failure_reports', timeout=600)
def failures_reports():
"""
Notifies owners' assets by email of hooks failures.
Expand Down
34 changes: 29 additions & 5 deletions kobo/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,32 @@ def __init__(self, *args, **kwargs):
# 'schedule': timedelta(hours=12)
#},
# Schedule every day at midnight UTC. Can be customized in admin section
"send-hooks-failures-reports": {
"task": "kobo.apps.hook.tasks.failures_reports",
"schedule": crontab(hour=0, minute=0),
'options': {'queue': 'kpi_queue'}
'send-hooks-failures-reports': {
'task': 'kobo.apps.hook.tasks.failures_reports',
'schedule': crontab(hour=0, minute=0),
'options': {'queue': 'kpi_queue'},
'enabled': False,
jnm marked this conversation as resolved.
Show resolved Hide resolved
},
# Schedule every Saturday at 4:00 AM UTC. Can be customized in admin section
'remove-s3-orphans': {
'task': 'kpi.tasks.remove_s3_orphans',
'schedule': crontab(hour=4, minute=0, day_of_week=6),
'options': {'queue': 'kpi_queue'},
'enabled': False,
},
# Schedule every Friday at 4:00 AM UTC. Can be customized in admin section
'remove-asset-snapshots': {
'task': 'kpi.tasks.remove_asset_snapshots',
'schedule': crontab(hour=4, minute=0, day_of_week=5),
'options': {'queue': 'kpi_queue'},
'enabled': False,
},
# Schedule every Friday at 5:00 AM UTC. Can be customized in admin section
'remove-import-tasks': {
'task': 'kpi.tasks.remove_import_tasks',
'schedule': crontab(hour=5, minute=0, day_of_week=5),
'options': {'queue': 'kpi_queue'},
'enabled': False,
},
}

Expand Down Expand Up @@ -714,6 +736,8 @@ def __init__(self, *args, **kwargs):


SESSION_ENGINE = "redis_sessions.session"
SESSION_REDIS = RedisHelper.config(default="redis://redis_cache:6380/2")
SESSION_REDIS = RedisHelper.session_config(default="redis://redis_cache:6380/2")

LOCK_REDIS = RedisHelper.lock_config(default="redis://redis_cache:6380/3")

TESTING = False
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
from django.db.models import Max
from django.utils import timezone

from .delete_base_command import DeleteBaseCommand
from .remove_base_command import RemoveBaseCommand
from kpi.models import AssetSnapshot


class Command(DeleteBaseCommand):
class Command(RemoveBaseCommand):

help = "Deletes assets snapshots"
help = "Removes asset snapshots"

def _prepare_delete_queryset(self, **options):
days = options["days"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
from django.db import transaction, connection


class DeleteBaseCommand(BaseCommand):
class RemoveBaseCommand(BaseCommand):

def __init__(self, stdout=None, stderr=None, no_color=False):
super(DeleteBaseCommand, self).__init__(stdout=stdout, stderr=stderr, no_color=no_color)
super(RemoveBaseCommand, self).__init__(stdout=stdout, stderr=stderr, no_color=no_color)
self._model = None

def add_arguments(self, parser):
super(DeleteBaseCommand, self).add_arguments(parser)
super(RemoveBaseCommand, self).add_arguments(parser)
parser.add_argument(
"--days",
default=90,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

from django.utils import timezone

from .delete_base_command import DeleteBaseCommand
from .remove_base_command import RemoveBaseCommand
from kpi.models import ImportTask


class Command(DeleteBaseCommand):
class Command(RemoveBaseCommand):

help = "Deletes import tasks"
help = "Removes import tasks"

def _prepare_delete_queryset(self, **options):
days = options["days"]
Expand Down
154 changes: 154 additions & 0 deletions kpi/management/commands/remove_s3_orphans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#!/usr/bin/env python
# vim: ai ts=4 sts=4 et sw=4 coding=utf-8
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import codecs
import re
import sys
import time

from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.core.files.storage import get_storage_class
from django.utils.translation import ugettext as _

from kpi.models.import_export_task import ExportTask


# S3 Monkey Patch
from boto import handler
from boto.resultset import ResultSet
from boto.s3.bucket import Bucket
import xml.sax
import xml.sax.saxutils


def _get_all(self, element_map, initial_query_string='',
noliveleger marked this conversation as resolved.
Show resolved Hide resolved
headers=None, **params):
query_args = self._get_all_query_args(
params,
initial_query_string=initial_query_string
)
response = self.connection.make_request('GET', self.name,
headers=headers,
query_args=query_args)
body = response.read()

if response.status == 200:
rs = ResultSet(element_map)
h = handler.XmlHandler(rs, self)
try:
xml.sax.parseString(fix_bad_characters(body), h)
except Exception as e:
print("XML Parsing Error - {}".format(str(e)))
error_filename = "/srv/logs/s3_body_error-{}.xml".format(str(int(time.time())))
with open(error_filename, "w") as xmlfile_error:
xmlfile_error.write("{}\n".format(str(e)))
xmlfile_error.write(body)
raise Exception(str(e))
return rs
else:
raise self.connection.provider.storage_response_error(
response.status, response.reason, body)


def fix_bad_characters(str_):

try:
str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", str_)
dorey marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
# Try to force unicode
str_ = re.sub(r"&(?!(quot|apos|lt|gt|amp);)", "&", unicode(str_, "utf-8"))
str_ = str_.encode("utf-8")
return str_


class Command(BaseCommand):
help = _('Removes orphan files in S3')

def add_arguments(self, parser):
super(Command, self).add_arguments(parser)

parser.add_argument(
"--dry-run",
action='store_true',
default=False,
help="Do not delete files",
)

parser.add_argument(
"--log-files",
action='store_true',
default=True,
help="Save deleted files to a CSV",
)

def handle(self, *args, **kwargs):

Bucket._get_all = _get_all

dry_run = kwargs['dry_run']
log_files = kwargs['log_files']

self._s3 = get_storage_class('kpi.utils.extended_s3boto_storage.ExtendedS3BotoStorage')()
all_files = self._s3.bucket.list()
size_to_reclaim = 0
orphans = 0

now = time.time()
csv_filepath = '/srv/logs/orphan_files-{}.csv'.format(int(now))

print('Bucket name: {}'.format(settings.AWS_STORAGE_BUCKET_NAME))
noliveleger marked this conversation as resolved.
Show resolved Hide resolved
if dry_run:
print('Dry run mode activated')
noliveleger marked this conversation as resolved.
Show resolved Hide resolved
if log_files:
print('CSV: {}'.format(csv_filepath))

if log_files:
with open(csv_filepath, "w") as csv:
csv.write("type,filename,filesize\n")

for f in all_files:
try:
filename = f.name
if filename[-1] != "/":
# KPI Exports
if re.match(r"[^\/]*\/exports\/.+", filename):
if not ExportTask.objects.filter(result=filename).exists():
filesize = f.size
orphans += 1
size_to_reclaim += filesize
if log_files:
csv = codecs.open(csv_filepath, "a", "utf-8")
csv.write("{},{},{}\n".format("exports", filename, filesize))
csv.close()
if not dry_run:
self.delete(f)

if time.time() - now >= 5 * 60:
print("[{}] Still alive...".format(str(int(time.time()))))
now = time.time()

except Exception as e:
print("ERROR - {}".format(str(e)))
sys.exit(-1)

print("Orphans: {}".format(orphans))
print("Size: {}".format(self.sizeof_fmt(size_to_reclaim)))

def delete(self, file_object):
try:
print("File {} does not exist in DB".format(file_object.name).encode('utf-8'))
self._s3.delete_all(file_object.name)
dorey marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
print("ERROR - Could not delete file {} - Reason {}".format(
file_object.name,
str(e)))

@staticmethod
def sizeof_fmt(num, suffix='B'):
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)
21 changes: 21 additions & 0 deletions kpi/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from django.core.management import call_command

from kpi.models import ImportTask, ExportTask
from kpi.utils.lock import lock


@shared_task
@lock(key='update_search_index', timeout=3600)
noliveleger marked this conversation as resolved.
Show resolved Hide resolved
def update_search_index():
call_command('update_index', using=['default',], remove=True)

Expand All @@ -25,10 +27,29 @@ def export_in_background(export_task_uid):


@shared_task
@lock(key='sync_kobocat_xforms', timeout=3600 * 24)
def sync_kobocat_xforms(username=None, quiet=True):
call_command('sync_kobocat_xforms', username=username, quiet=quiet)


@shared_task
def import_survey_drafts_from_dkobo(**kwargs):
call_command('import_survey_drafts_from_dkobo', **kwargs)


@shared_task
@lock(key='remove_s3_orphans', timeout=3600)
def remove_s3_orphans():
call_command('remove_s3_orphans')


@shared_task
@lock(key='remove_asset_snapshots', timeout=3600)
def remove_asset_snapshots():
call_command('remove_asset_snapshots')


@shared_task
@lock(key='remove_import_tasks', timeout=3600)
def remove_import_tasks():
call_command('remove_import_tasks')
19 changes: 19 additions & 0 deletions kpi/utils/extended_s3boto_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from storages.backends.s3boto import S3BotoStorage


class ExtendedS3BotoStorage(S3BotoStorage):

def delete_all(self, name):
"""
Delete the key object and all its versions
:param name: str. S3 key (i.e. path to the file)
"""
name = self._normalize_name(self._clean_name(name))
self.bucket.delete_key(self._encode_name(name))

# Delete all previous versions
for versioned_key in self.bucket.list_versions(prefix=name):
noliveleger marked this conversation as resolved.
Show resolved Hide resolved
self.bucket.delete_key(versioned_key.name, version_id=versioned_key.version_id)
34 changes: 34 additions & 0 deletions kpi/utils/lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals

import os
from functools import wraps

import redis
from django.conf import settings


REDIS_LOCK_CLIENT = redis.Redis(**settings.LOCK_REDIS)


def lock(key='', timeout=None):
noliveleger marked this conversation as resolved.
Show resolved Hide resolved

def _lock(func):
@wraps(func)
def wrapper(*args, **kwargs):
ret_value = None
have_lock = False
prefix = os.getenv('REDIS_KPI_LOCK_PREFIX', 'kpi-lock')
key_ = '{}:{}'.format(prefix, key)
lock_ = REDIS_LOCK_CLIENT.lock(key_, timeout=timeout)
try:
have_lock = lock_.acquire(blocking=False)
if have_lock:
noliveleger marked this conversation as resolved.
Show resolved Hide resolved
ret_value = func(*args, **kwargs)
finally:
if have_lock:
lock_.release()

return ret_value
noliveleger marked this conversation as resolved.
Show resolved Hide resolved
return wrapper
return _lock
Loading