diff --git a/zengine/management_commands.py b/zengine/management_commands.py index 58c9fcdc..d2e9e4f2 100644 --- a/zengine/management_commands.py +++ b/zengine/management_commands.py @@ -12,6 +12,7 @@ import os import sys import tempfile +import threading from distutils.errors import DistutilsError from pyoko.db.adapter.db_riak import BlockSave, BlockDelete @@ -289,7 +290,8 @@ def _init_update_po_files(self, domains): """Update or initialize the `.po` translation files""" for language in settings.TRANSLATIONS: for domain, options in domains.items(): - if language == options['default']: continue # Default language of the domain doesn't need translations + if language == options[ + 'default']: continue # Default language of the domain doesn't need translations if os.path.isfile(_po_path(language, domain)): # If the translation already exists, update it, keeping the parts already translated self._update_po_file(language, domain, options['pot']) @@ -298,13 +300,15 @@ def _init_update_po_files(self, domains): self._init_po_file(language, domain, options['pot']) def _update_po_file(self, language, domain, pot_path): - print('Updating po file for {language} in domain {domain}'.format(language=language, domain=domain)) + print('Updating po file for {language} in domain {domain}'.format(language=language, + domain=domain)) updater = babel_frontend.update_catalog() _setup_babel_command(updater, domain, language, pot_path) _run_babel_command(updater) def _init_po_file(self, language, domain, pot_path): - print('Creating po file for {language} in domain {domain}'.format(language=language, domain=domain)) + print('Creating po file for {language} in domain {domain}'.format(language=language, + domain=domain)) initializer = babel_frontend.init_catalog() _setup_babel_command(initializer, domain, language, pot_path) _run_babel_command(initializer) @@ -328,7 +332,9 @@ def run(self): for language in settings.TRANSLATIONS: for domain, default_lang in settings.TRANSLATION_DOMAINS.items(): if language == default_lang: continue # Default language of the domain doesn't need translations - print('Compiling po file for {language} in domain {domain}'.format(language=language, domain=domain)) + print( + 'Compiling po file for {language} in domain {domain}'.format(language=language, + domain=domain)) compiler = babel_frontend.compile_catalog() _setup_babel_command(compiler, domain, language, _po_path(language, domain)) _run_babel_command(compiler) @@ -352,7 +358,8 @@ def _setup_babel_command(babel, domain, language, input_file): def _po_path(language, domain): - return os.path.join(settings.TRANSLATIONS_DIR, language, 'LC_MESSAGES', '{domain}.po'.format(domain=domain)) + return os.path.join(settings.TRANSLATIONS_DIR, language, 'LC_MESSAGES', + '{domain}.po'.format(domain=domain)) class PrepareMQ(Command): @@ -427,7 +434,19 @@ def list_system_views(self): print(" |_ %s" % view) -class LoadDiagrams(Command): + +class AtomicCounter(): + def __init__(self, initial=0): + self.value = initial + self._lock = threading.Lock() + + def increment(self, num=1): + with self._lock: + self.value += num + return self.value + + +class LoadDiagrams(Command, BaseThreadedCommand): """ Loads wf diagrams from disk to DB """ @@ -437,6 +456,7 @@ class LoadDiagrams(Command): PARAMS = [ {'name': 'wf_path', 'default': None, 'help': 'Only update given BPMN diagram'}, + {'name': 'threads', 'default': 30, 'help': 'Max number of threads. Defaults to 1'}, {'name': 'clear', 'action': 'store_true', 'help': 'Clear all TaskManager related models'}, @@ -451,7 +471,6 @@ def run(self): read workflows, checks if it's updated, tries to update if there aren't any running instances of that wf """ - from zengine.models.workflow_manager import DiagramXML, BPMNWorkflow, RunningInstancesExist from zengine.lib.cache import WFSpecNames if self.manager.args.clear: @@ -462,23 +481,31 @@ def run(self): paths = self.get_wf_from_path(self.manager.args.wf_path) else: paths = self.get_workflows() - count = 0 - for wf_name, content in paths: - key = 'bpmn_workflow_%s' % wf_name - wf, wf_is_new = BPMNWorkflow.objects.get_or_create(name=wf_name, key=key) - content = self._tmp_fix_diagram(content) - diagram, diagram_is_updated = DiagramXML.get_or_create_by_content(wf_name, content) - if wf_is_new or diagram_is_updated or self.manager.args.force: - count += 1 - print("%s created or updated" % wf_name.upper()) - try: - wf.set_xml(diagram, self.manager.args.force) - except RunningInstancesExist as e: - print(e.message) - print("Give \"--force\" parameter to enforce") + + self.counter = AtomicCounter() + + self.do_with_submit(self.load_diagram, paths, threads=self.manager.args.threads) + WFSpecNames().refresh() - print("%s BPMN file loaded" % count) + print("%s BPMN file loaded" % self.counter.value) + + def load_diagram(self, paths): + from zengine.models.workflow_manager import DiagramXML, BPMNWorkflow, RunningInstancesExist + + wf_name, content = paths + key = 'bpmn_workflow_%s' % wf_name + wf, wf_is_new = BPMNWorkflow.objects.get_or_create(name=wf_name, key=key) + content = self._tmp_fix_diagram(content) + diagram, diagram_is_updated = DiagramXML.get_or_create_by_content(wf_name, content) + if wf_is_new or diagram_is_updated or self.manager.args.force: + self.counter.increment() + print("%s created or updated" % wf_name.upper()) + try: + wf.set_xml(diagram, self.manager.args.force) + except RunningInstancesExist as e: + print(e.message) + print("Give \"--force\" parameter to enforce") def _clear_models(self): from zengine.models.workflow_manager import DiagramXML, BPMNWorkflow, WFInstance, \ @@ -585,7 +612,7 @@ def check_redis(): try: cache.ping() - print(CheckList.OKGREEN+"{0}Redis is working{1}"+CheckList.ENDC) + print(CheckList.OKGREEN + "{0}Redis is working{1}" + CheckList.ENDC) except ConnectionError as e: print(__(u"{0}Redis is not working{1} ").format(CheckList.FAIL, CheckList.ENDC), e.message) @@ -667,7 +694,7 @@ def run(self): if prefix_name != "": if prefix_name != 'all': for name in prefix_name.split(','): - keys = cache.keys(name+"*") + keys = cache.keys(name + "*") for key in keys: cache.delete(key) print("%d object(s) deleted from cache with PREFIX %s " % (len(keys), name))