Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
johanneskoester committed Sep 6, 2023
1 parent 7b5a66f commit 0cc4eff
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 28 deletions.
116 changes: 89 additions & 27 deletions snakemake_executor_plugin_cluster_generic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import os
import shlex
import subprocess
from typing import Generator, List, Optional, Set
import sys
import threading
import time
from typing import Generator, List, Optional

from snakemake_interface_common.exceptions import WorkflowError
from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo
Expand All @@ -21,21 +24,29 @@
# Omit this class if you don't need any.
@dataclass
class ExecutorSettings(ExecutorSettingsBase):
submit_cmd: Optional[str] = field(default=None, metadata={"help": "Command for submitting jobs"})
status_cmd: Optional[str] = field(default=None, metadata={"help": "Command for retrieving job status"})
submit_cmd: Optional[str] = field(
default=None, metadata={"help": "Command for submitting jobs"}
)
status_cmd: Optional[str] = field(
default=None, metadata={"help": "Command for retrieving job status"}
)
cancel_cmd: Optional[str] = field(
default=None,
metadata={
"help": "Command for cancelling jobs. Expected to take one or more jobids as arguments."
}
"help": "Command for cancelling jobs. Expected to take one or more "
"jobids as arguments."
},
)
cancel_nargs: int = field(
default=20,
metadata={
"help": "Number of jobids to pass to cancel_cmd. If more are given, cancel_cmd will be called multiple times."
}
"help": "Number of jobids to pass to cancel_cmd. If more are given, "
"cancel_cmd will be called multiple times."
},
)
sidecar_cmd: Optional[str] = field(
default=None, metadata={"help": "Command for sidecar process."}
)
sidecar_cmd: Optional[str] = field(default=None, metadata={"help": "Command for sidecar process."})


# Required:
Expand All @@ -62,9 +73,12 @@ def __init__(
workflow,
logger,
# configure behavior of RemoteExecutor below
pass_default_remote_provider_args=True, # whether arguments for setting the remote provider shall be passed to jobs
pass_default_resources_args=True, # whether arguments for setting default resources shall be passed to jobs
pass_envvar_declarations_to_cmd=True, # whether environment variables shall be passed to jobs
# whether arguments for setting the remote provider shall be passed to jobs
pass_default_remote_provider_args=True,
# whether arguments for setting default resources shall be passed to jobs
pass_default_resources_args=True,
# whether environment variables shall be passed to jobs
pass_envvar_declarations_to_cmd=True,
)

if self.workflow.executor_settings.submit_cmd is None:
Expand All @@ -89,12 +103,6 @@ def __init__(
self.external_jobid = dict()

def run_job(self, job: ExecutorJobInterface):
# Implement here how to run a job.
# You can access the job's resources, etc.
# via the job object.
# After submitting the job, you have to call self.report_job_submission(job_info).
# with job_info being of type snakemake_interface_executor_plugins.executors.base.SubmittedJobInfo.

jobscript = self.get_jobscript(job)
self.write_jobscript(job, jobscript)

Expand Down Expand Up @@ -130,7 +138,9 @@ def run_job(self, job: ExecutorJobInterface):
self.external_jobid[f] for f in job.input if f in self.external_jobid
)
try:
submitcmd = job.format_wildcards(self.workflow.executor_settings.submit_cmd, dependencies=deps)
submitcmd = job.format_wildcards(
self.workflow.executor_settings.submit_cmd, dependencies=deps
)
except AttributeError as e:
raise WorkflowError(str(e), rule=job.rule if not job.is_group() else None)

Expand Down Expand Up @@ -177,7 +187,9 @@ def run_job(self, job: ExecutorJobInterface):

self.report_job_submission(job_info)

async def check_active_jobs(self, active_jobs: List[SubmittedJobInfo]) -> Generator[SubmittedJobInfo, None, None]:
async def check_active_jobs(
self, active_jobs: List[SubmittedJobInfo]
) -> Generator[SubmittedJobInfo, None, None]:
success = "success"
failed = "failed"
running = "running"
Expand Down Expand Up @@ -212,8 +224,10 @@ def job_status(
self.status_cmd_kills.append(-e.returncode)
if len(self.status_cmd_kills) > 10:
self.logger.info(
"Cluster status command {} was killed >10 times with signal(s) {} "
"(if this happens unexpectedly during your workflow execution, "
"Cluster status command {} was killed >10 "
"times with signal(s) {} "
"(if this happens unexpectedly during your "
"workflow execution, "
"have a closer look.).".format(
self.statuscmd, ",".join(self.status_cmd_kills)
)
Expand All @@ -228,7 +242,8 @@ def job_status(
ret = ret.strip().split("\n")
if len(ret) != 1 or ret[0] not in valid_returns:
raise WorkflowError(
"Cluster status command {} returned {} but just a single line with one of {} is expected.".format(
"Cluster status command {} returned {} but just a single "
"line with one of {} is expected.".format(
self.statuscmd, "\\n".join(ret), ",".join(valid_returns)
)
)
Expand Down Expand Up @@ -278,15 +293,17 @@ def _chunks(lst, n):
yield lst[i : i + n]

if self.cancelcmd: # We have --cluster-cancel
# Enumerate job IDs and create chunks. If cancelnargs evaluates to false (0/None)
# Enumerate job IDs and create chunks.
# If cancelnargs evaluates to false (0/None)
# then pass all job ids at once
jobids = [job_info.aux["external_jobid"] for job_info in active_jobs]
chunks = list(_chunks(jobids, self.cancelnargs or len(jobids)))
# Go through the chunks and cancel the jobs, warn in case of failures.
failures = 0
for chunk in chunks:
try:
cancel_timeout = 2 # rather fail on timeout than miss canceling all
# rather fail on timeout than miss canceling all
cancel_timeout = 2
env = dict(os.environ)
if self.sidecar_vars:
env["SNAKEMAKE_CLUSTER_SIDECAR_VARS"] = self.sidecar_vars
Expand All @@ -301,13 +318,15 @@ def _chunks(lst, n):
if failures:
self.logger.info(
(
"{} out of {} calls to --cluster-cancel failed. This is safe to "
"{} out of {} calls to --cluster-cancel failed. "
"This is safe to "
"ignore in most cases."
).format(failures, len(chunks))
)
else:
self.logger.info(
"No --cluster-cancel given. Will exit after finishing currently running jobs."
"No --cluster-cancel given. Will exit after "
"finishing currently running jobs."
)
self.shutdown()

Expand All @@ -334,4 +353,47 @@ def get_jobfinished_marker(self, job: ExecutorJobInterface):
return os.path.join(self.tmpdir, f"{job.jobid}.jobfinished")

def get_jobfailed_marker(self, job: ExecutorJobInterface):
return os.path.join(self.tmpdir, f"{job.jobid}.jobfailed")
return os.path.join(self.tmpdir, f"{job.jobid}.jobfailed")

def _launch_sidecar(self):
def copy_stdout(executor, process):
"""Run sidecar process and copy it's stdout to our stdout."""
while process.poll() is None and executor.wait:
buf = process.stdout.readline()
if buf:
sys.stdout.write(buf)
# one final time ...
buf = process.stdout.readline()
if buf:
sys.stdout.write(buf)

def wait(executor, process):
while executor.wait:
time.sleep(0.5)
process.terminate()
process.wait()
self.logger.info(
"Cluster sidecar process has terminated (retcode=%d)."
% process.returncode
)

self.logger.info("Launch sidecar process and read first output line.")
process = subprocess.Popen(
self.workflow.executor_settings.sidecar_cmd,
stdout=subprocess.PIPE,
shell=False,
encoding="utf-8",
)
self.sidecar_vars = process.stdout.readline()
while self.sidecar_vars and self.sidecar_vars[-1] in "\n\r":
self.sidecar_vars = self.sidecar_vars[:-1]
self.logger.info("Done reading first output line.")

thread_stdout = threading.Thread(
target=copy_stdout, name="sidecar_stdout", args=(self, process)
)
thread_stdout.start()
thread_wait = threading.Thread(
target=wait, name="sidecar_stdout", args=(self, process)
)
thread_wait.start()
20 changes: 20 additions & 0 deletions tests/sidecar.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

set -ex

echo "FIRST_LINE"
echo "sidecar started" > sidecar.txt
sleep infinity &
pid=$!

catch()
{
set -x
kill -TERM $pid || true
echo "sidecar stopped" >> sidecar.txt
exit 0
}

trap catch SIGTERM SIGINT

wait
9 changes: 8 additions & 1 deletion tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,11 @@ class TestWorkflowsStatusCmd(TestWorkflowsBase):
__test__ = True

def get_executor_settings(self) -> Optional[ExecutorSettingsBase]:
return self._get_executor_settings(status_cmd=self._get_cmd("qstatus.sh"))
return self._get_executor_settings(status_cmd=self._get_cmd("qstatus.sh"))


class TestWorkflowsSidecar(TestWorkflowsBase):
__test__ = True

def get_executor_settings(self) -> Optional[ExecutorSettingsBase]:
return self._get_executor_settings(sidecar_cmd=self._get_cmd("sidecar.sh"))

0 comments on commit 0cc4eff

Please sign in to comment.