Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into feature.skip_mode
Browse files Browse the repository at this point in the history
* upstream/master:
  Update cylc/flow/rundb.py
  GH Actions shortlog: check out head ref
  Add Paul Armstrong to CONTRIBUTING.md
  Report the error if write attempts fail to the DB
  Functional tests: allow `grep_fail` to accept options (cylc#6463)
  Bump dev version
  Prepare release 8.3.6
  Bump pypa/gh-action-pypi-publish from 1.11.0 to 1.12.2
  Simplify task state deltas
  Refactor task killing
  Bump pypa/gh-action-pypi-publish from 1.10.3 to 1.11.0 (cylc#6455) [skip ci]
  Spawn parentless sequential xtriggered task on set outputs (cylc#6448)
  Delete changes.d/6404.fix.md
  Pass pre-install plugins the value of workflow rundir as options.against source, allowing re-install and validate against source to access previous CLI opts as stored in rose-suite-cylc-install.conf
  • Loading branch information
wxtim committed Nov 12, 2024
2 parents 48c69fb + 4686f5f commit 6eaab2c
Show file tree
Hide file tree
Showing 37 changed files with 284 additions and 209 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/2_auto_publish_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
uses: cylc/release-actions/build-python-package@v1

- name: Publish distribution to PyPI
uses: pypa/gh-action-pypi-publish@v1.10.3
uses: pypa/gh-action-pypi-publish@v1.12.2
with:
user: __token__ # uses the API token feature of PyPI - least permissions possible
password: ${{ secrets.PYPI_TOKEN }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/shortlog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jobs:
uses: actions/checkout@v4
with:
fetch-depth: 0 # need to fetch all commits to check contributors
ref: ${{ github.event.pull_request.head.sha }}

- name: Check CONTRIBUTING.md
uses: cylc/release-actions/check-shortlog@v1
28 changes: 28 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,34 @@ $ towncrier create <PR-number>.<break|feat|fix>.md --content "Short description"

<!-- towncrier release notes start -->

## __cylc-8.3.6 (Released 2024-11-07)__

### 🔧 Fixes

[#4983](https://github.com/cylc/cylc-flow/pull/4983) - Ensure the runahead limit is recomputed when legacy "suicide-triggers" are used, to prevent erroneous stall in niche cases.

[#6263](https://github.com/cylc/cylc-flow/pull/6263) - Fix bug that prevented changes to user-defined xtriggers taking effect after a reload.

[#6326](https://github.com/cylc/cylc-flow/pull/6326) - Fix a rare issue where missing job records could cause tasks to become stuck in active states.

[#6364](https://github.com/cylc/cylc-flow/pull/6364) - Fixed bug where `cylc clean <workflow> --rm share` would not take care of removing the target of the `share/cycle` symlink directory.

[#6376](https://github.com/cylc/cylc-flow/pull/6376) - Fixes an issue that could cause Cylc to ignore the remaining hosts in a platform in response to an `ssh` error in some niche circumstances.

[#6388](https://github.com/cylc/cylc-flow/pull/6388) - Fix task state filtering in Tui.

[#6414](https://github.com/cylc/cylc-flow/pull/6414) - Broadcast will now reject truncated cycle points to aviod runtime errors.

[#6422](https://github.com/cylc/cylc-flow/pull/6422) - Enabled jumping to the top/bottom of log files in Tui using the "home" and "end" keys.

[#6431](https://github.com/cylc/cylc-flow/pull/6431) - The `cycle point format` was imposing an undesirable constraint on `wall_clock` offsets, this has been fixed.

[#6433](https://github.com/cylc/cylc-flow/pull/6433) - Ignore requests to trigger or set active tasks with --flow=none.

[#6445](https://github.com/cylc/cylc-flow/pull/6445) - Ensure `cylc trigger` does not fall back to `flow=none` when there are no active flows.

[#6448](https://github.com/cylc/cylc-flow/pull/6448) - Fix the non-spawning of parentless sequential xtriggered tasks when outputs are set.

## __cylc-8.3.5 (Released 2024-10-15)__

### 🔧 Fixes
Expand Down
3 changes: 2 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ We use [semver](https://semver.org/) to separate riskier changes (e.g. new featu
(e.g. 8.1, 8.2, 8.3)

**Bugfixes** and minor usability enhancements are made on bugfix branches and
released as the next maintainance version (e.g. 8.0.1, 8.0.2, 8.0.3). E.G. if the issue is on the `8.0.x` milestone, branch off of `8.0.x` to
released as the next maintenance version (e.g. 8.0.1, 8.0.2, 8.0.3). E.G. if the issue is on the `8.0.x` milestone, branch off of `8.0.x` to
develop your bugfix, then raise the pull request against the `8.0.x` branch. We will later merge the `8.0.x` branch into `master`.

Feel free to ask questions on the issue or
Expand Down Expand Up @@ -96,6 +96,7 @@ requests_).
- Diquan Jabbour
- Shixian Sheng
- Utheri Wagura
- Paul Armstrong
- Paul Earnshaw
<!-- end-shortlog -->

Expand Down
1 change: 0 additions & 1 deletion changes.d/4983.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6263.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6326.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6364.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6376.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6388.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6414.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6422.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6431.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6433.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6445.fix.md

This file was deleted.

21 changes: 8 additions & 13 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@
from cylc.flow.parsec.exceptions import ParsecError
from cylc.flow.run_modes import RunMode
from cylc.flow.task_id import TaskID
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE, TASK_STATUS_FAILED)
from cylc.flow.workflow_status import StopMode

from metomi.isodatetime.parsers import TimePointParser
Expand Down Expand Up @@ -258,19 +256,16 @@ async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]):

@_command('kill_tasks')
async def kill_tasks(schd: 'Scheduler', tasks: Iterable[str]):
"""Kill all tasks or a task/family if options are provided."""
"""Kill tasks.
Args:
tasks: Tasks/families/globs to kill.
"""
validate.is_tasks(tasks)
yield
itasks, _, bad_items = schd.pool.filter_task_proxies(tasks)
if schd.get_run_mode() == RunMode.SIMULATION:
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
itask.state_reset(TASK_STATUS_FAILED)
schd.data_store_mgr.delta_task_state(itask)
yield len(bad_items)
else:
schd.task_job_mgr.kill_task_jobs(schd.workflow, itasks)
yield len(bad_items)
active, _, unmatched = schd.pool.filter_task_proxies(tasks)
num_unkillable = schd.kill_tasks(active)
yield len(unmatched) + num_unkillable


@_command('hold')
Expand Down
115 changes: 41 additions & 74 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
Set,
TYPE_CHECKING,
Tuple,
Union,
)
import zlib

Expand Down Expand Up @@ -2276,7 +2275,7 @@ def _generate_broadcast_node_deltas(self, node_data, node_type):
# -----------
# Task Deltas
# -----------
def delta_task_state(self, itask):
def delta_task_state(self, itask: 'TaskProxy') -> None:
"""Create delta for change in task proxy state.
Args:
Expand All @@ -2292,34 +2291,47 @@ def delta_task_state(self, itask):
update_time = time()

# update task instance
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta: PbTaskProxy = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id)
)
tp_delta.stamp = f'{tp_id}@{update_time}'
tp_delta.state = itask.state.status
for field in ('is_held', 'is_queued', 'is_runahead'):
val = getattr(itask.state, field)
if (
# only update the fields that have changed compared to store:
getattr(tproxy, field) != val
# or changed since earlier delta that is not yet sent:
or getattr(tp_delta, field) != val
):
setattr(tp_delta, field, val)
if (
tproxy.state != itask.state.status
or tp_delta.state != itask.state.status
):
tp_delta.state = itask.state.status
if tp_delta.state in self.latest_state_tasks:
tp_ref = itask.identity
tp_queue = self.latest_state_tasks[tp_delta.state]
if tp_ref in tp_queue:
tp_queue.remove(tp_ref)
self.latest_state_tasks[tp_delta.state].appendleft(tp_ref)
# if state is final work out new task mean.
if tp_delta.state in TASK_STATUSES_FINAL:
elapsed_time = task_mean_elapsed_time(itask.tdef)
if elapsed_time:
t_id = self.definition_id(tproxy.name)
t_delta = PbTask(
stamp=f'{t_id}@{update_time}',
mean_elapsed_time=elapsed_time
)
self.updated[TASKS].setdefault(
t_id,
PbTask(id=t_id)).MergeFrom(t_delta)
self.state_update_families.add(tproxy.first_parent)
if tp_delta.state in self.latest_state_tasks:
tp_ref = itask.identity
tp_queue = self.latest_state_tasks[tp_delta.state]
if tp_ref in tp_queue:
tp_queue.remove(tp_ref)
self.latest_state_tasks[tp_delta.state].appendleft(tp_ref)
# if state is final work out new task mean.
if tp_delta.state in TASK_STATUSES_FINAL:
elapsed_time = task_mean_elapsed_time(itask.tdef)
if elapsed_time:
t_id = self.definition_id(tproxy.name)
t_delta = PbTask(
stamp=f'{t_id}@{update_time}',
mean_elapsed_time=elapsed_time
)
self.updated[TASKS].setdefault(
t_id,
PbTask(id=t_id)).MergeFrom(t_delta)
self.updates_pending = True

def delta_task_held(
self,
itask: Union[TaskProxy, Tuple[str, 'PointBase', bool]]
self, name: str, cycle: 'PointBase', is_held: bool
) -> None:
"""Create delta for change in task proxy held state.
Expand All @@ -2329,15 +2341,10 @@ def delta_task_held(
(name, cycle, is_held).
"""
if isinstance(itask, TaskProxy):
tokens = itask.tokens
is_held = itask.state.is_held
else:
name, cycle, is_held = itask
tokens = self.id_.duplicate(
task=name,
cycle=str(cycle),
)
tokens = self.id_.duplicate(
task=name,
cycle=str(cycle),
)
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(tokens)
if not tproxy:
Expand All @@ -2349,26 +2356,6 @@ def delta_task_held(
self.state_update_families.add(tproxy.first_parent)
self.updates_pending = True

def delta_task_queued(self, itask: TaskProxy) -> None:
"""Create delta for change in task proxy queued state.
Args:
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy
objects from the workflow task pool.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{time()}'
tp_delta.is_queued = itask.state.is_queued
self.state_update_families.add(tproxy.first_parent)
self.updates_pending = True

def delta_task_flow_nums(self, itask: TaskProxy) -> None:
"""Create delta for change in task proxy flow_nums.
Expand All @@ -2388,26 +2375,6 @@ def delta_task_flow_nums(self, itask: TaskProxy) -> None:
tp_delta.flow_nums = serialise_set(itask.flow_nums)
self.updates_pending = True

def delta_task_runahead(self, itask: TaskProxy) -> None:
"""Create delta for change in task proxy runahead state.
Args:
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy
objects from the workflow task pool.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{time()}'
tp_delta.is_runahead = itask.state.is_runahead
self.state_update_families.add(tproxy.first_parent)
self.updates_pending = True

def delta_task_output(
self,
itask: TaskProxy,
Expand Down
11 changes: 8 additions & 3 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ def execute_queued_items(self):

# something went wrong
# (includes DB file not found, transaction processing issue, db locked)
except sqlite3.Error:
except sqlite3.Error as e:
if not self.is_public:
# incase this isn't a filesystem issue, log the statements
# which make up the transaction to assist debug
Expand All @@ -493,8 +493,13 @@ def execute_queued_items(self):
raise
self.n_tries += 1
LOG.warning(
"%(file)s: write attempt (%(attempt)d) did not complete\n" % {
"file": self.db_file_name, "attempt": self.n_tries})
"%(file)s: write attempt (%(attempt)d)"
" did not complete: %(error)s\n" % {
"file": self.db_file_name,
"attempt": self.n_tries,
"error": str(e)
}
)
if self.conn is not None:
with suppress(sqlite3.Error):
self.conn.rollback()
Expand Down
49 changes: 45 additions & 4 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,14 @@
REMOTE_INIT_FAILED,
)
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE,
TASK_STATUSES_NEVER_ACTIVE,
TASK_STATUS_FAILED,
TASK_STATUS_PREPARING,
TASK_STATUS_RUNNING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_WAITING)
TASK_STATUS_WAITING,
TASK_STATUSES_ACTIVE,
TASK_STATUSES_NEVER_ACTIVE,
)
from cylc.flow.templatevars import get_template_vars
from cylc.flow.timer import Timer
from cylc.flow.util import cli_format
Expand All @@ -145,12 +147,15 @@
from cylc.flow.xtrigger_mgr import XtriggerManager

if TYPE_CHECKING:
from optparse import Values

# BACK COMPAT: typing_extensions.Literal
# FROM: Python 3.7
# TO: Python 3.8
from typing_extensions import Literal
from optparse import Values

from cylc.flow.network.resolvers import TaskMsg
from cylc.flow.task_proxy import TaskProxy


class SchedulerStop(CylcError):
Expand Down Expand Up @@ -1012,6 +1017,42 @@ def _set_stop(self, stop_mode: Optional[StopMode] = None) -> None:
self.stop_mode = stop_mode
self.update_data_store()

def kill_tasks(
self, itasks: 'Iterable[TaskProxy]', warn: bool = True
) -> int:
"""Kill tasks if they are in a killable state.
Args:
itasks: Tasks to kill.
warn: Whether to warn about tasks that are not in a killable state.
Returns number of tasks that could not be killed.
"""
jobless = self.get_run_mode() == RunMode.SIMULATION
to_kill: List[TaskProxy] = []
unkillable: List[TaskProxy] = []
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
itask.state_reset(
# directly reset to failed in sim mode, else let
# task_job_mgr handle it
status=(TASK_STATUS_FAILED if jobless else None),
is_held=True,
)
self.data_store_mgr.delta_task_state(itask)
to_kill.append(itask)
else:
unkillable.append(itask)
if warn and unkillable:
LOG.warning(
"Tasks not killable: "
f"{', '.join(sorted(t.identity for t in unkillable))}"
)
if not jobless:
self.task_job_mgr.kill_task_jobs(self.workflow, to_kill)

return len(unkillable)

def get_restart_num(self) -> int:
"""Return the number of the restart, else 0 if not a restart.
Expand Down
Loading

0 comments on commit 6eaab2c

Please sign in to comment.