From 861239ffc63a80a857fc1b19f1715efdd6a77c9e Mon Sep 17 00:00:00 2001 From: Mayo Faulkner Date: Fri, 16 Sep 2022 15:31:08 +0100 Subject: [PATCH 1/2] widefield multiprocessing jobs --- deploy/serverpc/crontab/large_jobs.py | 41 ++++++++++++++++----------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/deploy/serverpc/crontab/large_jobs.py b/deploy/serverpc/crontab/large_jobs.py index d6120850..e757f3ea 100644 --- a/deploy/serverpc/crontab/large_jobs.py +++ b/deploy/serverpc/crontab/large_jobs.py @@ -2,6 +2,7 @@ import time import logging from pathlib import Path +import multiprocessing from one.api import ONE from ibllib.pipes.local_server import task_queue @@ -11,21 +12,27 @@ subjects_path = Path('/mnt/s0/Data/Subjects/') sleep_time = 3600 -try: - one = ONE(cache_rest=None) - waiting_tasks = task_queue(mode='large', lab=None, one=one) +def run_large(): + try: + one = ONE(cache_rest=None) + waiting_tasks = task_queue(mode='large', lab=None, one=one) - if len(waiting_tasks) == 0: - _logger.info(f"No large tasks in the queue, retrying in {int(sleep_time / 60)} min") - # Query again only in 60min if queue is empty - time.sleep(sleep_time) - else: - tdict = waiting_tasks[0] - _logger.info(f"Running task {tdict['name']} for session {tdict['session']}") - ses = one.alyx.rest('sessions', 'list', django=f"pk,{tdict['session']}")[0] - session_path = Path(subjects_path).joinpath( - Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))) - run_alyx_task(tdict=tdict, session_path=session_path, one=one) -except BaseException: - _logger.error(f"Error running large task queue \n {traceback.format_exc()}") - time.sleep(int(sleep_time / 2)) + if len(waiting_tasks) == 0: + _logger.info(f"No large tasks in the queue, retrying in {int(sleep_time / 60)} min") + # Query again only in 60min if queue is empty + time.sleep(sleep_time) + else: + tdict = waiting_tasks[0] + _logger.info(f"Running task {tdict['name']} for session {tdict['session']}") + ses = one.alyx.rest('sessions', 'list', django=f"pk,{tdict['session']}")[0] + session_path = Path(subjects_path).joinpath( + Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))) + run_alyx_task(tdict=tdict, session_path=session_path, one=one) + except BaseException: + _logger.error(f"Error running large task queue \n {traceback.format_exc()}") + time.sleep(int(sleep_time / 2)) + + +if __name__ == '__main__': + multiprocessing.freeze_support() + run_large() From 35ab9029b0cab2c7a7c39760a9654fe50ea09e66 Mon Sep 17 00:00:00 2001 From: Mayo Faulkner Date: Mon, 27 Feb 2023 09:46:14 +0000 Subject: [PATCH 2/2] new one.alyx --- deploy/serverpc/crontab/large_jobs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deploy/serverpc/crontab/large_jobs.py b/deploy/serverpc/crontab/large_jobs.py index e757f3ea..0d69b0cb 100644 --- a/deploy/serverpc/crontab/large_jobs.py +++ b/deploy/serverpc/crontab/large_jobs.py @@ -12,10 +12,11 @@ subjects_path = Path('/mnt/s0/Data/Subjects/') sleep_time = 3600 + def run_large(): try: one = ONE(cache_rest=None) - waiting_tasks = task_queue(mode='large', lab=None, one=one) + waiting_tasks = task_queue(mode='large', lab=None, alyx=one.alyx) if len(waiting_tasks) == 0: _logger.info(f"No large tasks in the queue, retrying in {int(sleep_time / 60)} min")