From da3f58c166171269bce04607a695ac7492c5c728 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Wed, 31 Jan 2024 18:29:23 -0500 Subject: [PATCH] feat(cli): add -n to execute a certain number of tasks --- igneous_cli/cli.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/igneous_cli/cli.py b/igneous_cli/cli.py index c586d0f..fde9a23 100644 --- a/igneous_cli/cli.py +++ b/igneous_cli/cli.py @@ -759,11 +759,12 @@ def ccl_auto( @click.option('--min-sec', default=-1, help='Execute for at least this many seconds and quit after the last task finishes. Special values: (0) Run at most a single task. (-1) Loop forever (default).', type=float) @click.option('-q', '--quiet', is_flag=True, default=False, help='Suppress task status messages.', show_default=True) @click.option('-x', '--exit-on-empty', is_flag=True, default=False, help="Exit immediately when the queue is empty. Not reliable for SQS as measurements are approximate.", show_default=True) +@click.option('-n', '--num-tasks', type=int, default=-1, help="Process this many tasks. Use -1 to indicate no limit.", show_default=True) @click.pass_context def execute( ctx, queue, aws_region, lease_sec, tally, min_sec, - exit_on_empty, quiet + exit_on_empty, quiet, num_tasks, ): """Execute igneous tasks from a queue. @@ -774,7 +775,7 @@ def execute( See https://github.com/seung-lab/python-task-queue """ parallel = int(ctx.obj.get("parallel", 1)) - args = (queue, aws_region, lease_sec, tally, min_sec, exit_on_empty, quiet) + args = (queue, aws_region, lease_sec, tally, min_sec, exit_on_empty, quiet, num_tasks) parallel_execute_helper(parallel, args) def parallel_execute_helper(parallel, args): @@ -796,7 +797,7 @@ def parallel_execute_helper(parallel, args): def execute_helper( queue, aws_region, lease_sec, tally, min_sec, exit_on_empty, - quiet + quiet, num_tasks, ): tq = TaskQueue(normalize_path(queue), region_name=aws_region) @@ -832,10 +833,13 @@ def is_empty(): else: return tq.is_empty() - def stop_after_elapsed_time(tries, elapsed_time): + def stop_after_elapsed_time(tries, executed, elapsed_time): if exit_on_empty and is_empty(): return True + if num_tasks >= 0 and executed >= num_tasks: + return True + if min_sec < 0: return False return min_sec < elapsed_time