Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ def spawn_if_parentless(self, tdef, point, flow_nums):
def remove(self, itask: 'TaskProxy', reason: Optional[str] = None) -> None:
"""Remove a task from the pool."""
# the held state is no longer relevant -> remove it
self.release_held_active_task(itask)
self.release_held_task(itask)

# xtriggers are no longer relevant -> remove them
self.xtrigger_mgr.force_satisfy_all(itask, log=False)
Expand Down Expand Up @@ -1385,7 +1385,19 @@ def hold_active_task(self, itask: TaskProxy) -> None:
self.tasks_to_hold.add((itask.tdef.name, itask.point))
self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold)

def release_held_active_task(self, itask: TaskProxy) -> None:
def release_held_task(self, itask: TaskProxy) -> None:
"""Release a task from hold.

The given task proxy may be from the active pool, or a
transient representing a future task (targeted by `cylc set`
to set its outputs and thereby spawn children - and we release
held tasks if their outputs are set).

Note in the transient case the proxy will not be is_held but
this method removes it from the tasks_to_hold list below.
(This is a bit confused, could do with refactoring).

"""
if itask.state_reset(is_held=False):
self.data_store_mgr.delta_task_state(itask)
if (not itask.state.is_runahead) and itask.is_ready_to_run():
Expand Down Expand Up @@ -1436,7 +1448,7 @@ def release_held_tasks(self, items: Set[TaskTokens]) -> int:
itask = self._get_task_by_id(id_.relative_id)
if itask:
# release active task
self.release_held_active_task(itask)
self.release_held_task(itask)
else:
# release inactive task
self.data_store_mgr.delta_task_held(
Expand All @@ -1453,7 +1465,7 @@ def release_hold_point(self) -> None:
"""Unset the workflow hold point and release all held active tasks."""
self.hold_point = None
for itask in self.get_tasks():
self.release_held_active_task(itask)
self.release_held_task(itask)
self.tasks_to_hold.clear()
self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold)
self.workflow_db_mgr.put_workflow_hold_cycle_point(None)
Expand Down
Loading