Skip to content

Commit 76c4de0

Browse files
committed
add an option to raise within tasks
1 parent faecc64 commit 76c4de0

File tree

1 file changed

+17
-2
lines changed

1 file changed

+17
-2
lines changed

ibllib/pipes/tasks.py

+17-2
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,10 @@ class Task(abc.ABC):
112112
force = False # whether to re-download missing input files on local server if not present
113113
job_size = 'small' # either 'small' or 'large', defines whether task should be run as part of the large or small job services
114114
env = None # the environment name within which to run the task (NB: the env is not activated automatically!)
115+
on_error = 'continue' # whether to raise an exception on error ('raise') or report the error and continue ('continue')
115116

116117
def __init__(self, session_path, parents=None, taskid=None, one=None,
117-
machine=None, clobber=True, location='server', scratch_folder=None, **kwargs):
118+
machine=None, clobber=True, location='server', scratch_folder=None, on_error='continue', **kwargs):
118119
"""
119120
Base task class
120121
:param session_path: session path
@@ -129,6 +130,18 @@ def __init__(self, session_path, parents=None, taskid=None, one=None,
129130
:param scratch_folder: optional: Path where to write intermediate temporary data
130131
:param args: running arguments
131132
"""
133+
self.on_error = on_error
134+
self.log = '' # placeholder to keep the log of the task for registration
135+
self.cpu = kwargs.get('cpu', 1)
136+
self.gpu = kwargs.get('gpu', 0)
137+
self.io_charge = kwargs.get('io_charge', 5)
138+
self.priority = kwargs.get('priority', 30)
139+
self.ram = kwargs.get('ram', 4)
140+
self.level = 0 # level in the pipeline hierarchy: level 0 means there is no parent task
141+
self.outputs = [] # placeholder for a list of Path containing output files
142+
self.time_elapsed_secs = None
143+
self.time_out_secs = 3600 * 2 # time-out after which a task is considered dead
144+
self.version = ibllib.__version__
132145
self.taskid = taskid
133146
self.one = one
134147
self.session_path = session_path
@@ -263,10 +276,12 @@ def run(self, **kwargs):
263276
self.outputs = outputs if not self.outputs else self.outputs # ensure None if no inputs registered
264277
else:
265278
self.outputs.extend(ensure_list(outputs)) # Add output files to list of inputs to register
266-
except Exception:
279+
except Exception as e:
267280
_logger.error(traceback.format_exc())
268281
_logger.info(f'Job {self.__class__} errored')
269282
self.status = -1
283+
if self.on_error == 'raise':
284+
raise e
270285

271286
self.time_elapsed_secs = time.time() - start_time
272287
# log the outputs

0 commit comments

Comments
 (0)