From 0cc4effb6b848611e8609d1f660de320a598843d Mon Sep 17 00:00:00 2001 From: Johannes Koester Date: Wed, 6 Sep 2023 11:20:04 +0200 Subject: [PATCH] fixes --- .../__init__.py | 116 ++++++++++++++---- tests/sidecar.sh | 20 +++ tests/tests.py | 9 +- 3 files changed, 117 insertions(+), 28 deletions(-) create mode 100755 tests/sidecar.sh diff --git a/snakemake_executor_plugin_cluster_generic/__init__.py b/snakemake_executor_plugin_cluster_generic/__init__.py index 035ae01..41cd644 100644 --- a/snakemake_executor_plugin_cluster_generic/__init__.py +++ b/snakemake_executor_plugin_cluster_generic/__init__.py @@ -2,7 +2,10 @@ import os import shlex import subprocess -from typing import Generator, List, Optional, Set +import sys +import threading +import time +from typing import Generator, List, Optional from snakemake_interface_common.exceptions import WorkflowError from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo @@ -21,21 +24,29 @@ # Omit this class if you don't need any. @dataclass class ExecutorSettings(ExecutorSettingsBase): - submit_cmd: Optional[str] = field(default=None, metadata={"help": "Command for submitting jobs"}) - status_cmd: Optional[str] = field(default=None, metadata={"help": "Command for retrieving job status"}) + submit_cmd: Optional[str] = field( + default=None, metadata={"help": "Command for submitting jobs"} + ) + status_cmd: Optional[str] = field( + default=None, metadata={"help": "Command for retrieving job status"} + ) cancel_cmd: Optional[str] = field( default=None, metadata={ - "help": "Command for cancelling jobs. Expected to take one or more jobids as arguments." - } + "help": "Command for cancelling jobs. Expected to take one or more " + "jobids as arguments." + }, ) cancel_nargs: int = field( default=20, metadata={ - "help": "Number of jobids to pass to cancel_cmd. If more are given, cancel_cmd will be called multiple times." - } + "help": "Number of jobids to pass to cancel_cmd. If more are given, " + "cancel_cmd will be called multiple times." + }, + ) + sidecar_cmd: Optional[str] = field( + default=None, metadata={"help": "Command for sidecar process."} ) - sidecar_cmd: Optional[str] = field(default=None, metadata={"help": "Command for sidecar process."}) # Required: @@ -62,9 +73,12 @@ def __init__( workflow, logger, # configure behavior of RemoteExecutor below - pass_default_remote_provider_args=True, # whether arguments for setting the remote provider shall be passed to jobs - pass_default_resources_args=True, # whether arguments for setting default resources shall be passed to jobs - pass_envvar_declarations_to_cmd=True, # whether environment variables shall be passed to jobs + # whether arguments for setting the remote provider shall be passed to jobs + pass_default_remote_provider_args=True, + # whether arguments for setting default resources shall be passed to jobs + pass_default_resources_args=True, + # whether environment variables shall be passed to jobs + pass_envvar_declarations_to_cmd=True, ) if self.workflow.executor_settings.submit_cmd is None: @@ -89,12 +103,6 @@ def __init__( self.external_jobid = dict() def run_job(self, job: ExecutorJobInterface): - # Implement here how to run a job. - # You can access the job's resources, etc. - # via the job object. - # After submitting the job, you have to call self.report_job_submission(job_info). - # with job_info being of type snakemake_interface_executor_plugins.executors.base.SubmittedJobInfo. - jobscript = self.get_jobscript(job) self.write_jobscript(job, jobscript) @@ -130,7 +138,9 @@ def run_job(self, job: ExecutorJobInterface): self.external_jobid[f] for f in job.input if f in self.external_jobid ) try: - submitcmd = job.format_wildcards(self.workflow.executor_settings.submit_cmd, dependencies=deps) + submitcmd = job.format_wildcards( + self.workflow.executor_settings.submit_cmd, dependencies=deps + ) except AttributeError as e: raise WorkflowError(str(e), rule=job.rule if not job.is_group() else None) @@ -177,7 +187,9 @@ def run_job(self, job: ExecutorJobInterface): self.report_job_submission(job_info) - async def check_active_jobs(self, active_jobs: List[SubmittedJobInfo]) -> Generator[SubmittedJobInfo, None, None]: + async def check_active_jobs( + self, active_jobs: List[SubmittedJobInfo] + ) -> Generator[SubmittedJobInfo, None, None]: success = "success" failed = "failed" running = "running" @@ -212,8 +224,10 @@ def job_status( self.status_cmd_kills.append(-e.returncode) if len(self.status_cmd_kills) > 10: self.logger.info( - "Cluster status command {} was killed >10 times with signal(s) {} " - "(if this happens unexpectedly during your workflow execution, " + "Cluster status command {} was killed >10 " + "times with signal(s) {} " + "(if this happens unexpectedly during your " + "workflow execution, " "have a closer look.).".format( self.statuscmd, ",".join(self.status_cmd_kills) ) @@ -228,7 +242,8 @@ def job_status( ret = ret.strip().split("\n") if len(ret) != 1 or ret[0] not in valid_returns: raise WorkflowError( - "Cluster status command {} returned {} but just a single line with one of {} is expected.".format( + "Cluster status command {} returned {} but just a single " + "line with one of {} is expected.".format( self.statuscmd, "\\n".join(ret), ",".join(valid_returns) ) ) @@ -278,7 +293,8 @@ def _chunks(lst, n): yield lst[i : i + n] if self.cancelcmd: # We have --cluster-cancel - # Enumerate job IDs and create chunks. If cancelnargs evaluates to false (0/None) + # Enumerate job IDs and create chunks. + # If cancelnargs evaluates to false (0/None) # then pass all job ids at once jobids = [job_info.aux["external_jobid"] for job_info in active_jobs] chunks = list(_chunks(jobids, self.cancelnargs or len(jobids))) @@ -286,7 +302,8 @@ def _chunks(lst, n): failures = 0 for chunk in chunks: try: - cancel_timeout = 2 # rather fail on timeout than miss canceling all + # rather fail on timeout than miss canceling all + cancel_timeout = 2 env = dict(os.environ) if self.sidecar_vars: env["SNAKEMAKE_CLUSTER_SIDECAR_VARS"] = self.sidecar_vars @@ -301,13 +318,15 @@ def _chunks(lst, n): if failures: self.logger.info( ( - "{} out of {} calls to --cluster-cancel failed. This is safe to " + "{} out of {} calls to --cluster-cancel failed. " + "This is safe to " "ignore in most cases." ).format(failures, len(chunks)) ) else: self.logger.info( - "No --cluster-cancel given. Will exit after finishing currently running jobs." + "No --cluster-cancel given. Will exit after " + "finishing currently running jobs." ) self.shutdown() @@ -334,4 +353,47 @@ def get_jobfinished_marker(self, job: ExecutorJobInterface): return os.path.join(self.tmpdir, f"{job.jobid}.jobfinished") def get_jobfailed_marker(self, job: ExecutorJobInterface): - return os.path.join(self.tmpdir, f"{job.jobid}.jobfailed") \ No newline at end of file + return os.path.join(self.tmpdir, f"{job.jobid}.jobfailed") + + def _launch_sidecar(self): + def copy_stdout(executor, process): + """Run sidecar process and copy it's stdout to our stdout.""" + while process.poll() is None and executor.wait: + buf = process.stdout.readline() + if buf: + sys.stdout.write(buf) + # one final time ... + buf = process.stdout.readline() + if buf: + sys.stdout.write(buf) + + def wait(executor, process): + while executor.wait: + time.sleep(0.5) + process.terminate() + process.wait() + self.logger.info( + "Cluster sidecar process has terminated (retcode=%d)." + % process.returncode + ) + + self.logger.info("Launch sidecar process and read first output line.") + process = subprocess.Popen( + self.workflow.executor_settings.sidecar_cmd, + stdout=subprocess.PIPE, + shell=False, + encoding="utf-8", + ) + self.sidecar_vars = process.stdout.readline() + while self.sidecar_vars and self.sidecar_vars[-1] in "\n\r": + self.sidecar_vars = self.sidecar_vars[:-1] + self.logger.info("Done reading first output line.") + + thread_stdout = threading.Thread( + target=copy_stdout, name="sidecar_stdout", args=(self, process) + ) + thread_stdout.start() + thread_wait = threading.Thread( + target=wait, name="sidecar_stdout", args=(self, process) + ) + thread_wait.start() diff --git a/tests/sidecar.sh b/tests/sidecar.sh new file mode 100755 index 0000000..7849e66 --- /dev/null +++ b/tests/sidecar.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +set -ex + +echo "FIRST_LINE" +echo "sidecar started" > sidecar.txt +sleep infinity & +pid=$! + +catch() +{ + set -x + kill -TERM $pid || true + echo "sidecar stopped" >> sidecar.txt + exit 0 +} + +trap catch SIGTERM SIGINT + +wait diff --git a/tests/tests.py b/tests/tests.py index 210eb85..377e163 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -40,4 +40,11 @@ class TestWorkflowsStatusCmd(TestWorkflowsBase): __test__ = True def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: - return self._get_executor_settings(status_cmd=self._get_cmd("qstatus.sh")) \ No newline at end of file + return self._get_executor_settings(status_cmd=self._get_cmd("qstatus.sh")) + + +class TestWorkflowsSidecar(TestWorkflowsBase): + __test__ = True + + def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: + return self._get_executor_settings(sidecar_cmd=self._get_cmd("sidecar.sh"))