diff --git a/.baseline_version b/.baseline_version new file mode 100644 index 0000000..a9c729a --- /dev/null +++ b/.baseline_version @@ -0,0 +1 @@ +0.4.2-baseline-version diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..5573fb6 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,15 @@ +# EditorConfig is awesome: http://EditorConfig.org + +# top-most EditorConfig file +root = true + +[*] +end_of_line = lf +insert_final_newline = true +charset = utf-8 +indent_style = space +indent_size = 4 + +[*.{yml,yaml}] +indent_style = space +indent_size = 2 diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..74d81cc --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +*.smk linguist-language=Python +Snakefile linguist-language=Python diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml new file mode 100644 index 0000000..f0c554b --- /dev/null +++ b/.github/workflows/docs.yml @@ -0,0 +1,18 @@ +name: docs +on: + workflow_dispatch: + push: + paths: + - 'docs/**' + +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + with: + python-version: 3.9 + - run: pip install --upgrade pip + - run: pip install -r docs/requirements.txt + - run: mkdocs gh-deploy --force diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml new file mode 100644 index 0000000..ce3a292 --- /dev/null +++ b/.github/workflows/main.yaml @@ -0,0 +1,34 @@ +name: tests + +on: + workflow_dispatch: + push: + branches: + - master + - main + pull_request: + branches_ignore: [] + +jobs: + Dry_Run_and_Lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: docker://snakemake/snakemake:v5.24.2 + - name: Dry Run with test data + run: | + docker run -v $PWD:/opt2 snakemake/snakemake:v5.24.2 \ + /opt2/baseline run --input \ + /opt2/.tests/WT_S1.R1.fastq.gz /opt2/.tests/WT_S1.R2.fastq.gz \ + /opt2/.tests/WT_S2_R1.fastq.gz /opt2/.tests/WT_S2_R2.fastq.gz \ + /opt2/.tests/WT_S3_1.fastq.gz /opt2/.tests/WT_S3_2.fastq.gz \ + /opt2/.tests/WT_S4_R1.001.fastq.gz /opt2/.tests/WT_S4_R2.001.fastq.gz \ + --output /opt2/output --mode local --dry-run + - name: View the pipeline config file + run: | + echo "Generated config file for pipeline...." && cat $PWD/output/config.json + - name: Lint Workflow + continue-on-error: true + run: | + docker run -v $PWD:/opt2 snakemake/snakemake:v5.24.2 snakemake --lint -s /opt2/output/workflow/Snakefile -d /opt2/output || \ + echo 'There may have been a few warnings or errors. Please read through the log to determine if its harmless.' diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a403fea --- /dev/null +++ b/.gitignore @@ -0,0 +1,171 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site +site/ + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# DS_Store +.DS_Store +._* +**/.DS_Store +**/._* + +.snakemake* +**/.snakemake* +.venv +.venv/* + +# Pipeline Results or Output +results/ +output/ +tmp/ +scratch/ + +# mkdocs documentation +site/ + +# Pipeline generated files or directories +.tests/*/ +.snakemake/ + +# Cached Java directories +.oracle_jre_usage/ +.java/ + +# GNU Parallel +.parallel/ + +# Temp files +*.tmp +**/*.tmp + +# Test script and test +# output directories +test.sh +test_*/ +tmp_*/ diff --git a/.tests/WT_S1.R1.fastq.gz b/.tests/WT_S1.R1.fastq.gz new file mode 100644 index 0000000..e69de29 diff --git a/.tests/WT_S1.R2.fastq.gz b/.tests/WT_S1.R2.fastq.gz new file mode 100644 index 0000000..e69de29 diff --git a/.tests/WT_S2_R1.fastq.gz b/.tests/WT_S2_R1.fastq.gz new file mode 100644 index 0000000..e69de29 diff --git a/.tests/WT_S2_R2.fastq.gz b/.tests/WT_S2_R2.fastq.gz new file mode 100644 index 0000000..e69de29 diff --git a/.tests/WT_S3_1.fastq.gz b/.tests/WT_S3_1.fastq.gz new file mode 100644 index 0000000..e69de29 diff --git a/.tests/WT_S3_2.fastq.gz b/.tests/WT_S3_2.fastq.gz new file mode 100644 index 0000000..e69de29 diff --git a/.tests/WT_S4_R1.001.fastq.gz b/.tests/WT_S4_R1.001.fastq.gz new file mode 100644 index 0000000..e69de29 diff --git a/.tests/WT_S4_R2.001.fastq.gz b/.tests/WT_S4_R2.001.fastq.gz new file mode 100644 index 0000000..e69de29 diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..0b11cb7 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,9 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.1.0] - 2022-08-22 +### Added + - Recommended [scaffold](https://github.com/OpenOmics/baseline) for building a snakemake pipeline diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d532cd9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 OpenOmics + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..1c9e9da --- /dev/null +++ b/README.md @@ -0,0 +1,108 @@ +
+ +

baseline 🔬

+ + **_long pipeline name_** + + [![tests](https://github.com/OpenOmics/baseline/workflows/tests/badge.svg)](https://github.com/OpenOmics/baseline/actions/workflows/main.yaml) [![docs](https://github.com/OpenOmics/baseline/workflows/docs/badge.svg)](https://github.com/OpenOmics/baseline/actions/workflows/docs.yml) [![GitHub issues](https://img.shields.io/github/issues/OpenOmics/baseline?color=brightgreen)](https://github.com/OpenOmics/baseline/issues) [![GitHub license](https://img.shields.io/github/license/OpenOmics/baseline)](https://github.com/OpenOmics/baseline/blob/main/LICENSE) + + + This is the home of the pipeline, baseline. Its long-term goals: to accurately ...insert goal, to infer ...insert goal, and to boldly ...insert goal like no pipeline before! + +
+ +## Overview +Welcome to baseline! Before getting started, we highly recommend reading through [baseline's documentation](https://openomics.github.io/baseline/). + +The **`./baseline`** pipeline is composed several inter-related sub commands to setup and run the pipeline across different systems. Each of the available sub commands perform different functions: + + * [baseline run](https://openomics.github.io/baseline/usage/run/): Run the baseline pipeline with your input files. + * [baseline unlock](https://openomics.github.io/baseline/usage/unlock/): Unlocks a previous runs output directory. + * [baseline install](https://openomics.github.io/baseline/usage/install/): Download reference files locally. + * [baseline cache](https://openomics.github.io/baseline/usage/cache/): Cache remote resources locally, coming soon! + +**baseline** is a comprehensive ...insert long description. It relies on technologies like [Singularity1](https://singularity.lbl.gov/) to maintain the highest-level of reproducibility. The pipeline consists of a series of data processing and quality-control steps orchestrated by [Snakemake2](https://snakemake.readthedocs.io/en/stable/), a flexible and scalable workflow management system, to submit jobs to a cluster. + +The pipeline is compatible with data generated from Illumina short-read sequencing technologies. As input, it accepts a set of FastQ files and can be run locally on a compute instance or on-premise using a cluster. A user can define the method or mode of execution. The pipeline can submit jobs to a cluster using a job scheduler like SLURM (more coming soon!). A hybrid approach ensures the pipeline is accessible to all users. + +Before getting started, we highly recommend reading through the [usage](https://openomics.github.io/baseline/usage/run/) section of each available sub command. + +For more information about issues or trouble-shooting a problem, please checkout our [FAQ](https://openomics.github.io/baseline/faq/questions/) prior to [opening an issue on Github](https://github.com/OpenOmics/baseline/issues). + +## Dependencies +**Requires:** `singularity>=3.5` `snakemake>=6.0` + +At the current moment, the pipeline uses a mixture of enviroment modules and docker images; however, this will be changing soon! In the very near future, the pipeline will only use docker images. With that being said, [snakemake](https://snakemake.readthedocs.io/en/stable/getting_started/installation.html) and [singularity](https://singularity.lbl.gov/all-releases) must be installed on the target system. Snakemake orchestrates the execution of each step in the pipeline. To guarantee the highest level of reproducibility, each step of the pipeline will rely on versioned images from [DockerHub](https://hub.docker.com/orgs/nciccbr/repositories). Snakemake uses singularity to pull these images onto the local filesystem prior to job execution, and as so, snakemake and singularity will be the only two dependencies in the future. + +## Installation +Please clone this repository to your local filesystem using the following command: +```bash +# Clone Repository from Github +git clone https://github.com/OpenOmics/baseline.git +# Change your working directory +cd baseline/ +# Add dependencies to $PATH +# Biowulf users should run +module load snakemake singularity +# Get usage information +./baseline -h +``` + +## Contribute +This site is a living document, created for and by members like you. baseline is maintained by the members of OpenOmics and is improved by continous feedback! We encourage you to contribute new content and make improvements to existing content via pull request to our [GitHub repository](https://github.com/OpenOmics/baseline). + + +## Cite + +If you use this software, please cite it as below: + +
+ @BibText + +```text +Citation coming soon! +``` + +
+ +
+ @APA + +```text +Citation coming soon! +``` + +
+ + + + +## References +**1.** Kurtzer GM, Sochat V, Bauer MW (2017). Singularity: Scientific containers for mobility of compute. PLoS ONE 12(5): e0177459. +**2.** Koster, J. and S. Rahmann (2018). "Snakemake-a scalable bioinformatics workflow engine." Bioinformatics 34(20): 3600. diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..6e8bf73 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +0.1.0 diff --git a/baseline b/baseline new file mode 100755 index 0000000..bb3cd87 --- /dev/null +++ b/baseline @@ -0,0 +1,700 @@ +#!/usr/bin/env python3 +# -*- coding: UTF-8 -*- + +""" +ABOUT: This is the main entry for the pipeline. +REQUIRES: + - python>=3.6 + - snakemake (recommended>=6.0.0) + - singularity (recommended==latest) +DISCLAIMER: + PUBLIC DOMAIN NOTICE + NIAID Collaborative Bioinformatics Resource (NCBR) + National Institute of Allergy and Infectious Diseases (NIAID) +This software/database is a "United States Government Work" under +the terms of the United States Copyright Act. It was written as +part of the author's official duties as a United States Government +employee and thus cannot be copyrighted. This software is freely +available to the public for use. +Although all reasonable efforts have been taken to ensure the +accuracy and reliability of the software and data, NCBR do not and +cannot warrant the performance or results that may be obtained by +using this software or data. NCBR and NIH disclaim all warranties, +express or implied, including warranties of performance, +merchantability or fitness for any particular purpose. +Please cite the author and NIH resources like the "Biowulf Cluster" +in any work or product based on this material. +USAGE: + $ baseline [OPTIONS] +EXAMPLE: + $ baseline run --input *.R?.fastq.gz --output output/ +""" + +# Python standard library +from __future__ import print_function +import sys, os, subprocess, re, json, textwrap + +# 3rd party imports from pypi +import argparse # potential python3 3rd party package, added in python/3.5 + +# Local imports +from src import version +from src.run import init, setup, bind, dryrun, runner +from src.shells import bash +from src.utils import ( + Colors, + err, + exists, + fatal, + hashed, + permissions, + check_cache, + require) + + +# Pipeline Metadata +__version__ = version +__authors__ = 'Skyler Kuhn' +__email__ = 'skyler.kuhn@nih.gov' +__home__ = os.path.dirname(os.path.abspath(__file__)) +_name = os.path.basename(sys.argv[0]) +_description = 'An awesome baseline pipeline' + + +def unlock(sub_args): + """Unlocks a previous runs output directory. If snakemake fails ungracefully, + it maybe required to unlock the working directory before proceeding again. + This is rare but it does occasionally happen. Maybe worth add a --force + option to delete the '.snakemake/' directory in the future. + @param sub_args : + Parsed arguments for unlock sub-command + """ + print("Unlocking the pipeline's output directory...") + outdir = sub_args.output + + try: + unlock_output = subprocess.check_output([ + 'snakemake', '--unlock', + '--cores', '1', + '--configfile=config.json' + ], cwd = outdir, + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + # Unlocking process returned a non-zero exit code + sys.exit("{}\n{}".format(e, e.output)) + + print("Successfully unlocked the pipeline's working directory!") + + +def run(sub_args): + """Initialize, setup, and run the pipeline. + Calls initialize() to create output directory and copy over pipeline resources, + setup() to create the pipeline config file, dryrun() to ensure their are no issues + before running the pipeline, and finally run() to execute the Snakemake workflow. + @param sub_args : + Parsed arguments for run sub-command + """ + # Step 0. Check for required dependencies + # The pipelines has only two requirements: + # snakemake and singularity + require(['snakemake', 'singularity'], ['snakemake', 'singularity']) + + # Step 1. Initialize working directory, + # copy over required resources to run + # the pipeline + git_repo = __home__ + input_files = init( + repo_path = git_repo, + output_path = sub_args.output, + links = sub_args.input + ) + + # Step 2. Setup pipeline for execution, + # dynamically create config.json config + # file from user inputs and base config + # templates + config = setup(sub_args, + ifiles = input_files, + repo_path = git_repo, + output_path = sub_args.output + ) + + # Step 3. Resolve docker/singularity bind + # paths from the config file. + bindpaths = bind( + sub_args, + config = config + ) + + config['bindpaths'] = bindpaths + + # Step 4. Save config to output directory + with open(os.path.join(sub_args.output, 'config.json'), 'w') as fh: + json.dump(config, fh, indent = 4, sort_keys = True) + + # Optional Step: Dry-run pipeline + if sub_args.dry_run: + # Dryrun pipeline + dryrun_output = dryrun(outdir = sub_args.output) # python3 returns byte-string representation + print("\nDry-running {} pipeline:\n{}".format(_name, dryrun_output.decode("utf-8"))) + sys.exit(0) + + # Step 5. Orchestrate pipeline execution, + # run pipeline in locally on a compute node + # for debugging purposes or submit the master + # job to the job scheduler, SLURM, and create + # logging file + if not exists(os.path.join(sub_args.output, 'logfiles')): + # Create directory for logfiles + os.makedirs(os.path.join(sub_args.output, 'logfiles')) + if sub_args.mode == 'local': + log = os.path.join(sub_args.output, 'logfiles', 'snakemake.log') + else: + log = os.path.join(sub_args.output, 'logfiles', 'master.log') + logfh = open(log, 'w') + mjob = runner(mode = sub_args.mode, + outdir = sub_args.output, + # additional_bind_paths = all_bind_paths, + alt_cache = sub_args.singularity_cache, + threads = int(sub_args.threads), + jobname = sub_args.job_name, + submission_script=os.path.join(__home__, 'src', 'run.sh'), + logger = logfh, + additional_bind_paths = ",".join(bindpaths), + tmp_dir = sub_args.tmp_dir, + ) + + # Step 6. Wait for subprocess to complete, + # this is blocking and not asynchronous + if not sub_args.silent: + print("\nRunning {} pipeline in '{}' mode...".format(_name, sub_args.mode)) + mjob.wait() + logfh.close() + + # Step 7. Relay information about submission + # of the master job or the exit code of the + # pipeline that ran in local mode + if sub_args.mode == 'local': + if int(mjob.returncode) == 0: + print('{} pipeline has successfully completed'.format(_name)) + else: + fatal('{} pipeline failed. Please see {} for more information.'.format(_name, + os.path.join(sub_args.output, 'logfiles', 'snakemake.log'))) + elif sub_args.mode == 'slurm': + jobid = open(os.path.join(sub_args.output, 'logfiles', 'mjobid.log')).read().strip() + if not sub_args.silent: + if int(mjob.returncode) == 0: + print('Successfully submitted master job: ', end="") + else: + fatal('Error occurred when submitting the master job.') + print(jobid) + + +def cache(sub_args): + """Caches remote resources or reference files stored on DockerHub and S3. + Local SIFs will be created from images defined in 'config/containers/images.json'. + @TODO: add option to cache other shared S3 resources (i.e. kraken db and fqscreen indices) + @param sub_args : + Parsed arguments for unlock sub-command + """ + print(sub_args) + fatal('NotImplementedError... Comming Soon!') + + +def parsed_arguments(name, description): + """Parses user-provided command-line arguments. Requires argparse and textwrap + package. argparse was added to standard lib in python 3.5 and textwrap was added + in python 3.5. To create custom help formatting for subparsers a docstring is + used create the help message for required options. argparse does not support named + subparser groups, which is normally what would be used to accomphish this reformatting. + As so, the help message for require options must be suppressed. If a new required arg + is added to a subparser, it must be added to the docstring and the usage statement + also must be updated. + @param name : + Name of the pipeline or command-line tool + @param description : + Short description of pipeline or command-line tool + """ + # Add styled name and description + c = Colors + styled_name = "{0}{1}{2}baseline{3}".format(c.bold, c.bg_black, c.cyan, c.end) + description = "{0}{1}{2}".format(c.bold, description, c.end) + + # Create a top-level parser + parser = argparse.ArgumentParser(description = '{}: {}'.format(styled_name, description)) + + # Adding Verison information + parser.add_argument('--version', action = 'version', version='%(prog)s {}'.format(__version__)) + + # Create sub-command parser + subparsers = parser.add_subparsers(help='List of available sub-commands') + + # Sub-parser for the "run" sub-command + # Grouped sub-parser arguments are currently + # not supported: https://bugs.python.org/issue9341 + # Here is a work around to create more useful help message for named + # options that are required! Please note: if a required arg is added the + # description below should be updated (i.e. update usage and add new option) + required_run_options = textwrap.dedent("""\ + {0}: {1} + + {3}{4}Synopsis:{5} + $ {2} run [--help] \\ + [--dry-run] [--job-name JOB_NAME] [--mode {{slurm,local}}] \\ + [--sif-cache SIF_CACHE] [--singularity-cache SINGULARITY_CACHE] \\ + [--silent] [--threads THREADS] [--tmp-dir TMP_DIR] \\ + --input INPUT [INPUT ...] \\ + --output OUTPUT + + Optional arguments are shown in square brackets above. + + {3}{4}Description:{5} + To run the ...long pipeline name with your data raw data, please + provide a space seperated list of FastQ (globbing is supported) and an output + directory to store results. + + {3}{4}Required arguments:{5} + --input INPUT [INPUT ...] + Input FastQ file(s) to process. The pipeline does NOT + support single-end data. FastQ files for one or more + samples can be provided. Multiple input FastQ files + should be seperated by a space. Globbing for multiple + file is also supported. + Example: --input .tests/*.R?.fastq.gz + --output OUTPUT + Path to an output directory. This location is where + the pipeline will create all of its output files, also + known as the pipeline's working directory. If the user + provided working directory has not been initialized, + it will be created automatically. + Example: --output /data/$USER/output + + {3}{4}Analysis options:{5} + ...coming soon! + + {3}{4}Orchestration options:{5} + --mode {{slurm,local}} + Method of execution. Defines the mode of execution. + Vaild options for this mode include: local or slurm. + Additional modes of exection are coming soon, default: + slurm. + Here is a brief description of each mode: + • local: uses local method of execution. local runs + will run serially on compute instance. This is useful + for testing, debugging, or when a users does not have + access to a high performance computing environment. + If this option is not provided, it will default to a + slurm mode of execution. + • slurm: uses slurm execution backend. This method + will submit jobs to a cluster using sbatch. It is + recommended running the pipeline in this mode as it + will be significantly faster. + Example: --mode slurm + --job-name JOB_NAME + Overrides the name of the pipeline's master job. When + submitting the pipeline to a jobscheduler, this option + overrides the default name of the master job. This can + be useful for tracking the progress or status of a run, + default: pl:{2}. + Example: --job-name {2}_03-14.1592 + --dry-run + Does not execute anything. Only displays what steps in + the pipeline remain or will be run. + Example: --dry-run + --silent + Silence standard output. This will reduces the amount + of information displayed to standard output when the + master job is submitted to the job scheduler. Only the + job id of the master job is returned. + Example: --silent + --singularity-cache SINGULARITY_CACHE + Overrides the $SINGULARITY_CACHEDIR variable. Images + from remote registries are cached locally on the file + system. By default, the singularity cache is set to: + '/path/to/output/directory/.singularity/'. Please note + that this cache cannot be shared across users. + Example: --singularity-cache /data/$USER + --sif-cache SIF_CACHE + Path where a local cache of SIFs are stored. This cache + can be shared across users if permissions are properly + setup. If a SIF does not exist in the SIF cache, the + image will be pulled from Dockerhub. {2} cache + sub command can be used to create a local SIF cache. + Please see {2} cache for more information. + Example: --sif-cache /data/$USER/sifs/ + --tmp-dir TMP_DIR + Path on the file system for writing temporary output + files. By default, the temporary directory is set to + '/lscratch/$SLURM_JOBID' for backwards compatibility + with the NIH's Biowulf cluster; however, if you are + running the pipeline on another cluster, this option + will need to be specified. Ideally, this path should + point to a dedicated location on the filesystem for + writing tmp files. On many systems, this location is + set to somewhere in /scratch. If you need to inject a + variable into this string that should NOT be expanded, + please quote this options value in single quotes. + Example: --tmp-dir '/scratch/$USER/' + --threads THREADS + Max number of threads for local processes. It is + recommended setting this vaule to the maximum number + of CPUs available on the host machine, default: 2. + Example: --threads: 16 + + {3}{4}Misc Options:{5} + -h, --help Show usage information, help message, and exit. + Example: --help + """.format(styled_name, description, name, c.bold, c.url, c.end, c.italic)) + + # Display example usage in epilog + run_epilog = textwrap.dedent("""\ + {2}{3}Example:{4} + # Step 1.) Grab an interactive node, + # do not run on head node! + srun -N 1 -n 1 --time=1:00:00 --mem=8gb --cpus-per-task=2 --pty bash + module purge + module load singularity snakemake + + # Step 2A.) Dry-run the pipeline + ./{0} run --input .tests/*.R?.fastq.gz \\ + --output /data/$USER/output \\ + --mode slurm \\ + --dry-run + + # Step 2B.) Run the {0} pipeline + # The slurm mode will submit jobs to + # the cluster. It is recommended running + # the pipeline in this mode. + ./{0} run --input .tests/*.R?.fastq.gz \\ + --output /data/$USER/output \\ + --mode slurm + + {2}{3}Version:{4} + {1} + """.format(name, __version__, c.bold, c.url, c.end)) + + # Supressing help message of required args to overcome no sub-parser named groups + subparser_run = subparsers.add_parser('run', + help = 'Run the {} pipeline with input files.'.format(name), + usage = argparse.SUPPRESS, + formatter_class=argparse.RawDescriptionHelpFormatter, + description = required_run_options, + epilog = run_epilog, + add_help=False + ) + + # Required Arguments + # Input FastQ files + subparser_run.add_argument( + '--input', + # Check if the file exists and if it is readable + type = lambda file: permissions(parser, file, os.R_OK), + required = True, + nargs = '+', + help = argparse.SUPPRESS + ) + + # Output Directory, i.e + # working directory + subparser_run.add_argument( + '--output', + type = lambda option: os.path.abspath(os.path.expanduser(option)), + required = True, + help = argparse.SUPPRESS + ) + + # Optional Arguments + # Add custom help message + subparser_run.add_argument( + '-h', '--help', + action='help', + help=argparse.SUPPRESS + ) + + # Analysis options + # ... add here + + # Orchestration Options + # Execution Method, run locally + # on a compute node or submit to + # a supported job scheduler, etc. + subparser_run.add_argument( + '--mode', + type = str, + required = False, + default = "slurm", + choices = ['slurm', 'local'], + help = argparse.SUPPRESS + ) + + # Name of master job + subparser_run.add_argument( + '--job-name', + type = str, + required = False, + default = 'pl:{}'.format(name), + help = argparse.SUPPRESS + ) + + # Dry-run + # Do not execute the workflow, + # prints what steps remain + subparser_run.add_argument( + '--dry-run', + action = 'store_true', + required = False, + default = False, + help = argparse.SUPPRESS + ) + + # Silent output mode + subparser_run.add_argument( + '--silent', + action = 'store_true', + required = False, + default = False, + help = argparse.SUPPRESS + ) + + # Singularity cache directory, + # default uses output directory + subparser_run.add_argument( + '--singularity-cache', + type = lambda option: check_cache(parser, os.path.abspath(os.path.expanduser(option))), + required = False, + help = argparse.SUPPRESS + ) + + # Local SIF cache directory, + # default pull from Dockerhub + subparser_run.add_argument( + '--sif-cache', + type = lambda option: os.path.abspath(os.path.expanduser(option)), + required = False, + help = argparse.SUPPRESS + ) + + # Base directory to write + # temporary/intermediate files + subparser_run.add_argument( + '--tmp-dir', + type = str, + required = False, + default = '/lscratch/$SLURM_JOBID/', + help = argparse.SUPPRESS + ) + + # Number of threads for the + # pipeline's main proceess + # This is only applicable for + # local rules or when running + # in local mode. + subparser_run.add_argument( + '--threads', + type = int, + required = False, + default = 2, + help = argparse.SUPPRESS + ) + + # Sub-parser for the "unlock" sub-command + # Grouped sub-parser arguments are currently + # not supported: https://bugs.python.org/issue9341 + # Here is a work around to create more useful help message for named + # options that are required! Please note: if a required arg is added the + # description below should be updated (i.e. update usage and add new option) + required_unlock_options = textwrap.dedent("""\ + {0}: {1} + + {3}{4}Synopsis:{5} + $ {2} unlock [-h] --output OUTPUT + + Optional arguments are shown in square brackets above. + + {3}{4}Description:{5} + If the pipeline fails ungracefully, it maybe required to unlock + the working directory before proceeding again. Please verify that + the pipeline is not running before running this command. If the + pipeline is still running, the workflow manager will report the + working directory is locked. This is normal behavior. Do NOT run + this command if the pipeline is still running. + + {3}{4}Required arguments:{5} + --output OUTPUT Path to a previous run's output directory + to unlock. This will remove a lock on the + working directory. Please verify that the + pipeline is not running before running + this command. + Example: --output /data/$USER/output + + {3}{4}Misc Options:{5} + -h, --help Show usage information, help message, + and exit. + Example: --help + """.format(styled_name, description, name, c.bold, c.url, c.end)) + + # Display example usage in epilog + unlock_epilog = textwrap.dedent("""\ + {2}{3}Example:{4} + # Unlock output directory of pipeline + {0} unlock --output /data/$USER/output + + {2}{3}Version:{4} + {1} + """.format(name, __version__, c.bold, c.url, c.end)) + + # Supressing help message of required args to overcome no sub-parser named groups + subparser_unlock = subparsers.add_parser( + 'unlock', + help = 'Unlocks a previous runs output directory.', + usage = argparse.SUPPRESS, + formatter_class=argparse.RawDescriptionHelpFormatter, + description = required_unlock_options, + epilog = unlock_epilog, + add_help = False + ) + + # Required Arguments + # Output Directory (analysis working directory) + subparser_unlock.add_argument( + '--output', + type = str, + required = True, + help = argparse.SUPPRESS + ) + + # Add custom help message + subparser_unlock.add_argument( + '-h', '--help', + action='help', + help=argparse.SUPPRESS + ) + + + # Sub-parser for the "cache" sub-command + # Grouped sub-parser arguments are + # not supported: https://bugs.python.org/issue9341 + # Here is a work around to create more useful help message for named + # options that are required! Please note: if a required arg is added the + # description below should be updated (i.e. update usage and add new option) + required_cache_options = textwrap.dedent("""\ + {0}: {1} + + {3}{4}Synopsis:{5} + $ {2} cache [-h] [--dry-run] --sif-cache SIF_CACHE + + Optional arguments are shown in square brackets above. + + {3}{4}Description:{5} + Creates a local cache resources hosted on DockerHub or AWS S3. + These resources are normally pulled onto the filesystem when the + pipeline runs; however, due to network issues or DockerHub pull + rate limits, it may make sense to pull the resources once so a + shared cache can be created. It is worth noting that a singularity + cache cannot normally be shared across users. Singularity strictly + enforces that a cache is owned by the user. To get around this + issue, the cache subcommand can be used to create local SIFs on + the filesystem from images on DockerHub. + + {3}{4}Required arguments:{5} + --sif-cache SIF_CACHE + Path where a local cache of SIFs will be + stored. Images defined in containers.json + will be pulled into the local filesystem. + The path provided to this option can be + passed to the --sif-cache option of the + run sub command. Please see {2} + run sub command for more information. + Example: --sif-cache /data/$USER/cache + + {3}{4}Orchestration options:{5} + --dry-run Does not execute anything. Only displays + what remote resources would be pulled. + Example: --dry-run + + {3}{4}Misc Options:{5} + -h, --help Show usage information, help message, + and exits. + Example: --help + + """.format(styled_name, description, name, c.bold, c.url, c.end)) + + # Display example usage in epilog + cache_epilog = textwrap.dedent("""\ + {2}{3}Example:{4} + # Cache remote resources of pipeline + {0} cache --sif-cache /data/$USER/cache + + {2}{3}Version:{4} + {1} + """.format(name, __version__, c.bold, c.url, c.end)) + + # Supressing help message of required args to overcome no sub-parser named groups + subparser_cache = subparsers.add_parser( + 'cache', + help = 'Cache remote resources locally.', + usage = argparse.SUPPRESS, + formatter_class=argparse.RawDescriptionHelpFormatter, + description = required_cache_options, + epilog = cache_epilog, + add_help = False + ) + + # Required Arguments + # Output Directory (analysis working directory) + subparser_cache.add_argument( + '--sif-cache', + type = lambda option: os.path.abspath(os.path.expanduser(option)), + required = True, + help = argparse.SUPPRESS + ) + + # Optional Arguments + # Dry-run cache command (do not pull any remote resources) + subparser_cache.add_argument( + '--dry-run', + action = 'store_true', + required = False, + default = False, + help=argparse.SUPPRESS + ) + + # Add custom help message + subparser_cache.add_argument( + '-h', '--help', + action='help', + help=argparse.SUPPRESS + ) + + # Define handlers for each sub-parser + subparser_run.set_defaults(func = run) + subparser_unlock.set_defaults(func = unlock) + subparser_cache.set_defaults(func = cache) + + # Parse command-line args + args = parser.parse_args() + return args + + +def main(): + + # Sanity check for usage + if len(sys.argv) == 1: + # Nothing was provided + fatal('Invalid usage: {} [-h] [--version] ...'.format(_name)) + + # Collect args for sub-command + args = parsed_arguments( + name = _name, + description = _description + ) + + # Display version information + err('{} ({})'.format(_name, __version__)) + + # Mediator method to call sub-command's set handler function + args.func(args) + + +if __name__ == '__main__': + main() diff --git a/config/cluster.json b/config/cluster.json new file mode 100644 index 0000000..545162f --- /dev/null +++ b/config/cluster.json @@ -0,0 +1,8 @@ +{ + "__default__": { + "threads": "4", + "mem": "8g", + "partition": "norm", + "time": "0-04:00:00" + } +} diff --git a/config/config.json b/config/config.json new file mode 100644 index 0000000..7056c94 --- /dev/null +++ b/config/config.json @@ -0,0 +1,4 @@ +{ + "options": { + } +} diff --git a/config/containers.json b/config/containers.json new file mode 100644 index 0000000..46aecee --- /dev/null +++ b/config/containers.json @@ -0,0 +1,4 @@ +{ + "images": { + } +} diff --git a/config/genome.json b/config/genome.json new file mode 100644 index 0000000..00c1ed7 --- /dev/null +++ b/config/genome.json @@ -0,0 +1,4 @@ +{ + "references": { + } +} diff --git a/config/modules.json b/config/modules.json new file mode 100644 index 0000000..7a587e7 --- /dev/null +++ b/config/modules.json @@ -0,0 +1,4 @@ +{ + "tools": { + } +} diff --git a/docker/README.md b/docker/README.md new file mode 100644 index 0000000..2f30bcd --- /dev/null +++ b/docker/README.md @@ -0,0 +1,35 @@ +## Steps for Building Docker Images + +Directly below are instructions for building an image using the provided Dockerfile: + +```bash +# See listing of images on computer +docker image ls + +# Build from Dockerfile +docker build --no-cache -f example.dockerfile --tag=example:v0.1.0 . + +# Testing, take a peek inside +docker run -ti example:v0.1.0 /bin/bash + +# Updating Tag before pushing to DockerHub +docker tag example:v0.1.0 skchronicles/example:v0.1.0 +docker tag example:v0.1.0 skchronicles/example # latest + +# Check out new tag(s) +docker image ls + +# Push new tagged image to DockerHub +docker push skchronicles/example:v0.1.0 +docker push skchronicles/example:latest +``` + +### Other Recommended Steps + +Scan your image for known vulnerabilities: + +```bash +docker scan example:v0.1.0 +``` + +> **Please Note**: Any references to `skchronicles` should be replaced your username if you would also like to push the image to a non-org account. diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..7e964c7 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,33 @@ +# Build documentation + +> **Please Note:** When a commit is pushed to the `docs/` directory, it triggers a [github actions workflow](https://github.com/OpenOmics/baseline/actions) to build the static-site and push it to the gh-pages branch. + +### Installation +```bash +# Clone the Repository +git clone https://github.com/OpenOmics/baseline.git +cd baseline/ +# Create a virtual environment +python3 -m venv .venv +# Activate the virtual environment +. .venv/bin/activate +# Update pip +pip install --upgrade pip +# Download Dependencies +pip install -r docs/requirements.txt +``` + +### Preview while editing +MkDocs includes a previewing server, so you can view your updates live and as you write your documentation. The server will automatically rebuild the site upon editing and saving a file. +```bash +# Activate the virtual environment +. .venv/bin/activate +# Start serving your documentation +mkdocs serve +``` + +### Build static site +Once you are content with your changes, you can build the static site: +```bash +mkdocs build +``` diff --git a/docs/assets/favicon/favicon.ico b/docs/assets/favicon/favicon.ico new file mode 100644 index 0000000..e85006a Binary files /dev/null and b/docs/assets/favicon/favicon.ico differ diff --git a/docs/assets/icons/doc-book.svg b/docs/assets/icons/doc-book.svg new file mode 100644 index 0000000..10ced62 --- /dev/null +++ b/docs/assets/icons/doc-book.svg @@ -0,0 +1,9 @@ + + + + + + + + + diff --git a/docs/css/extra.css b/docs/css/extra.css new file mode 100644 index 0000000..5c0c7aa --- /dev/null +++ b/docs/css/extra.css @@ -0,0 +1,24 @@ +@keyframes heart { + 0%, 40%, 80%, 100% { + transform: scale(1); + } + 20%, 60% { + transform: scale(1.15); + } +} + +.heart { + animation: heart 1500ms infinite; +} + +[data-md-color-scheme="slate"] { + --md-primary-fg-color: #1A1B23DE; + --md-typeset-a-color: #b1b9ed; +} + +.md-typeset .admonition.custom-grid-button, +.md-typeset details.custom-grid-button { + border-color: var(--md-code-bg-color); + border-width: 2px; + width: 45%; +} \ No newline at end of file diff --git a/docs/faq/questions.md b/docs/faq/questions.md new file mode 100644 index 0000000..40c323d --- /dev/null +++ b/docs/faq/questions.md @@ -0,0 +1,4 @@ +# Frequently Asked Questions + +This page is still under construction. If you need immediate help, please [open an issue](https://github.com/OpenOmics/baseline/issues) on Github! + diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 0000000..086219d --- /dev/null +++ b/docs/index.md @@ -0,0 +1,91 @@ +
+ +

baseline 🔬

+ + long pipeline name
+ + tests + + + docs + + + GitHub issues + + + GitHub license + + +

+ This is the home of the pipeline, baseline. Its long-term goals: to accurately ...insert goal, to infer ...insert goal, and to boldly ...insert goal like no pipeline before! +

+ +
+ + +## Overview +Welcome to baseline's documentation! This guide is the main source of documentation for users that are getting started with the [long pipeline name](https://github.com/OpenOmics/baseline/). + +The **`./baseline`** pipeline is composed several inter-related sub commands to setup and run the pipeline across different systems. Each of the available sub commands perform different functions: + +
+ +!!! inline custom-grid-button "" + + [baseline run](usage/run.md) + Run the baseline pipeline with your input files. + +!!! inline custom-grid-button "" + + [baseline unlock](usage/unlock.md) + Unlocks a previous runs output directory. + +
+ +
+ + +!!! inline custom-grid-button "" + + [baseline install](usage/install.md) + Download remote reference files locally. + + +!!! inline custom-grid-button "" + + [baseline cache](usage/cache.md) + Cache remote software containers locally. + +
+ +**baseline** is a comprehensive ...insert long description. It relies on technologies like [Singularity1](https://singularity.lbl.gov/) to maintain the highest-level of reproducibility. The pipeline consists of a series of data processing and quality-control steps orchestrated by [Snakemake2](https://snakemake.readthedocs.io/en/stable/), a flexible and scalable workflow management system, to submit jobs to a cluster. + +The pipeline is compatible with data generated from Illumina short-read sequencing technologies. As input, it accepts a set of FastQ files and can be run locally on a compute instance or on-premise using a cluster. A user can define the method or mode of execution. The pipeline can submit jobs to a cluster using a job scheduler like SLURM (more coming soon!). A hybrid approach ensures the pipeline is accessible to all users. + +Before getting started, we highly recommend reading through the [usage](usage/run.md) section of each available sub command. + +For more information about issues or trouble-shooting a problem, please checkout our [FAQ](faq/questions.md) prior to [opening an issue on Github](https://github.com/OpenOmics/baseline/issues). + +## Contribute + +This site is a living document, created for and by members like you. baseline is maintained by the members of NCBR and is improved by continous feedback! We encourage you to contribute new content and make improvements to existing content via pull request to our [GitHub repository :octicons-heart-fill-24:{ .heart }](https://github.com/OpenOmics/baseline). + +## Citation + +If you use this software, please cite it as below: + +=== "BibTex" + + ``` + Citation coming soon! + ``` + +=== "APA" + + ``` + Citation coming soon! + ``` + +## References +**1.** Kurtzer GM, Sochat V, Bauer MW (2017). Singularity: Scientific containers for mobility of compute. PLoS ONE 12(5): e0177459. +**2.** Koster, J. and S. Rahmann (2018). "Snakemake-a scalable bioinformatics workflow engine." Bioinformatics 34(20): 3600. diff --git a/docs/license.md b/docs/license.md new file mode 100644 index 0000000..670504a --- /dev/null +++ b/docs/license.md @@ -0,0 +1,21 @@ +# MIT License + +*Copyright (c) 2023 OpenOmics* + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..dbe98a1 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,34 @@ +babel>=2.9.1 +click==7.1.2 +future==0.18.2 +gitdb==4.0.5 +GitPython==3.1.7 +htmlmin==0.1.12 +importlib-metadata>=3.10 +Jinja2==2.11.3 +joblib==0.16.0 +jsmin==3.0.0 +livereload==2.6.1 +lunr==0.5.8 +Markdown==3.2.2 +MarkupSafe==1.1.1 +mkdocs>=1.3.0 +mkdocs-awesome-pages-plugin==2.2.1 +mkdocs-git-revision-date-localized-plugin==0.7 +mkdocs-material +mkdocs-material-extensions +mkdocs-minify-plugin==0.3.0 +mkdocs-redirects==1.0.1 +nltk>=3.6.6 +pygments>=2.12 +pymdown-extensions +pytz==2020.1 +PyYAML>=5.4 +regex +six==1.15.0 +smmap==3.0.4 +tornado==6.0.4 +tqdm==4.48.2 +zipp==3.1.0 +mkdocs-git-revision-date-plugin +mike diff --git a/docs/usage/cache.md b/docs/usage/cache.md new file mode 100644 index 0000000..82c433f --- /dev/null +++ b/docs/usage/cache.md @@ -0,0 +1,72 @@ +# baseline cache + +## 1. About +The `baseline` executable is composed of several inter-related sub commands. Please see `baseline -h` for all available options. + +This part of the documentation describes options and concepts for baseline cache sub command in more detail. With minimal configuration, the **`cache`** sub command enables you to cache remote resources for the baseline pipeline. Caching remote resources allows the pipeline to run in an offline mode. The cache sub command can also be used to pull our pre-built reference bundles onto a new cluster or target system. + +The cache sub command creates local cache on the filesysytem for resources hosted on DockerHub or AWS S3. These resources are normally pulled onto the filesystem when the pipeline runs; however, due to network issues or DockerHub pull rate limits, it may make sense to pull the resources once so a shared cache can be created and re-used. It is worth noting that a singularity cache cannot normally be shared across users. Singularity strictly enforces that its cache is owned by the user. To get around this issue, the cache subcommand can be used to create local SIFs on the filesystem from images on DockerHub. + +## 2. Synopsis + +Coming Soon! + + \ No newline at end of file diff --git a/docs/usage/install.md b/docs/usage/install.md new file mode 100644 index 0000000..548df6d --- /dev/null +++ b/docs/usage/install.md @@ -0,0 +1,118 @@ +# baseline install + +## 1. About + +The `baseline` executable is composed of several inter-related sub commands. Please see `baseline -h` for all available options. + +This part of the documentation describes options and concepts for baseline install sub command in more detail. + +This page is still under construction 👷, more information is coming soon! + + \ No newline at end of file diff --git a/docs/usage/run.md b/docs/usage/run.md new file mode 100644 index 0000000..633b792 --- /dev/null +++ b/docs/usage/run.md @@ -0,0 +1,173 @@ +# baseline run + +## 1. About +The `baseline` executable is composed of several inter-related sub commands. Please see `baseline -h` for all available options. + +This part of the documentation describes options and concepts for baseline run sub command in more detail. With minimal configuration, the **`run`** sub command enables you to start running baseline pipeline. + +Setting up the baseline pipeline is fast and easy! In its most basic form, baseline run only has *two required inputs*. + +## 2. Synopsis +```text +$ baseline run [--help] \ + [--mode {slurm,local}] [--job-name JOB_NAME] [--batch-id BATCH_ID] \ + [--tmp-dir TMP_DIR] [--silent] [--sif-cache SIF_CACHE] \ + [--singularity-cache SINGULARITY_CACHE] \ + [--dry-run] [--threads THREADS] \ + --input INPUT [INPUT ...] \ + --output OUTPUT +``` + +The synopsis for each command shows its arguments and their usage. Optional arguments are shown in square brackets. + +A user **must** provide a list of FastQ (globbing is supported) to analyze via `--input` argument and an output directory to store results via `--output` argument. + +Use you can always use the `-h` option for information on a specific command. + +### 2.1 Required arguments + +Each of the following arguments are required. Failure to provide a required argument will result in a non-zero exit-code. + + `--input INPUT [INPUT ...]` +> **Input FastQ or BAM file(s).** +> *type: file(s)* +> +> One or more FastQ files can be provided. The pipeline does NOT support single-end data. From the command-line, each input file should seperated by a space. Globbing is supported! This makes selecting FastQ files easy. Input FastQ files should always be gzipp-ed. +> +> ***Example:*** `--input .tests/*.R?.fastq.gz` + +--- + `--output OUTPUT` +> **Path to an output directory.** +> *type: path* +> +> This location is where the pipeline will create all of its output files, also known as the pipeline's working directory. If the provided output directory does not exist, it will be created automatically. +> +> ***Example:*** `--output /data/$USER/baseline_out` + +### 2.2 Analysis options + +Each of the following arguments are optional, and do not need to be provided. + +...add non-required analysis options + +### 2.3 Orchestration options + +Each of the following arguments are optional, and do not need to be provided. + + `--dry-run` +> **Dry run the pipeline.** +> *type: boolean flag* +> +> Displays what steps in the pipeline remain or will be run. Does not execute anything! +> +> ***Example:*** `--dry-run` + +--- + `--silent` +> **Silence standard output.** +> *type: boolean flag* +> +> Reduces the amount of information directed to standard output when submitting master job to the job scheduler. Only the job id of the master job is returned. +> +> ***Example:*** `--silent` + +--- + `--mode {slurm,local}` +> **Execution Method.** +> *type: string* +> *default: slurm* +> +> Execution Method. Defines the mode or method of execution. Vaild mode options include: slurm or local. +> +> ***slurm*** +> The slurm execution method will submit jobs to the [SLURM workload manager](https://slurm.schedmd.com/). It is recommended running baseline in this mode as execution will be significantly faster in a distributed environment. This is the default mode of execution. +> +> ***local*** +> Local executions will run serially on compute instance. This is useful for testing, debugging, or when a users does not have access to a high performance computing environment. If this option is not provided, it will default to a local execution mode. +> +> ***Example:*** `--mode slurm` + +--- + `--job-name JOB_NAME` +> **Set the name of the pipeline's master job.** +> *type: string* +> *default: pl:baseline* +> +> When submitting the pipeline to a job scheduler, like SLURM, this option always you to set the name of the pipeline's master job. By default, the name of the pipeline's master job is set to "pl:baseline". +> +> ***Example:*** `--job-name pl_id-42` + +--- + `--singularity-cache SINGULARITY_CACHE` +> **Overrides the $SINGULARITY_CACHEDIR environment variable.** +> *type: path* +> *default: `--output OUTPUT/.singularity`* +> +> Singularity will cache image layers pulled from remote registries. This ultimately speeds up the process of pull an image from DockerHub if an image layer already exists in the singularity cache directory. By default, the cache is set to the value provided to the `--output` argument. Please note that this cache cannot be shared across users. Singularity strictly enforces you own the cache directory and will return a non-zero exit code if you do not own the cache directory! See the `--sif-cache` option to create a shareable resource. +> +> ***Example:*** `--singularity-cache /data/$USER/.singularity` + +--- + `--sif-cache SIF_CACHE` +> **Path where a local cache of SIFs are stored.** +> *type: path* +> +> Uses a local cache of SIFs on the filesystem. This SIF cache can be shared across users if permissions are set correctly. If a SIF does not exist in the SIF cache, the image will be pulled from Dockerhub and a warning message will be displayed. The `baseline cache` subcommand can be used to create a local SIF cache. Please see `baseline cache` for more information. This command is extremely useful for avoiding DockerHub pull rate limits. It also remove any potential errors that could occur due to network issues or DockerHub being temporarily unavailable. We recommend running baseline with this option when ever possible. +> +> ***Example:*** `--singularity-cache /data/$USER/SIFs` + +--- + `--threads THREADS` +> **Max number of threads for each process.** +> *type: int* +> *default: 2* +> +> Max number of threads for each process. This option is more applicable when running the pipeline with `--mode local`. It is recommended setting this vaule to the maximum number of CPUs available on the host machine. +> +> ***Example:*** `--threads 12` + + +--- + `--tmp-dir TMP_DIR` +> **Max number of threads for each process.** +> *type: path* +> *default: `/lscratch/$SLURM_JOBID`* +> +> Path on the file system for writing temporary output files. By default, the temporary directory is set to '/lscratch/$SLURM_JOBID' for backwards compatibility with the NIH's Biowulf cluster; however, if you are running the pipeline on another cluster, this option will need to be specified. Ideally, this path should point to a dedicated location on the filesystem for writing tmp files. On many systems, this location is set to somewhere in /scratch. If you need to inject a variable into this string that should NOT be expanded, please quote this options value in single quotes. +> +> ***Example:*** `--tmp-dir /scratch/$USER/` + +### 2.4 Miscellaneous options +Each of the following arguments are optional, and do not need to be provided. + + `-h, --help` +> **Display Help.** +> *type: boolean flag* +> +> Shows command's synopsis, help message, and an example command +> +> ***Example:*** `--help` + +## 3. Example +```bash +# Step 1.) Grab an interactive node, +# do not run on head node! +srun -N 1 -n 1 --time=1:00:00 --mem=8gb --cpus-per-task=2 --pty bash +module purge +module load singularity snakemake + +# Step 2A.) Dry-run the pipeline +./baseline run --input .tests/*.R?.fastq.gz \ + --output /data/$USER/output \ + --mode slurm \ + --dry-run + +# Step 2B.) Run the baseline pipeline +# The slurm mode will submit jobs to +# the cluster. It is recommended running +# the pipeline in this mode. +./baseline run --input .tests/*.R?.fastq.gz \ + --output /data/$USER/output \ + --mode slurm +``` \ No newline at end of file diff --git a/docs/usage/unlock.md b/docs/usage/unlock.md new file mode 100644 index 0000000..5a77d1b --- /dev/null +++ b/docs/usage/unlock.md @@ -0,0 +1,56 @@ +# baseline unlock + +## 1. About +The `baseline` executable is composed of several inter-related sub commands. Please see `baseline -h` for all available options. + +This part of the documentation describes options and concepts for baseline unlock sub command in more detail. With minimal configuration, the **`unlock`** sub command enables you to unlock a pipeline output directory. + +If the pipeline fails ungracefully, it maybe required to unlock the working directory before proceeding again. Snakemake will inform a user when it maybe necessary to unlock a working directory with an error message stating: `Error: Directory cannot be locked`. + +Please verify that the pipeline is not running before running this command. If the pipeline is currently running, the workflow manager will report the working directory is locked. The is the default behavior of snakemake, and it is normal. Do NOT run this command if the pipeline is still running! Please kill the master job and it's child jobs prior to running this command. + +Unlocking baseline pipeline output directory is fast and easy! In its most basic form, baseline unlock only has *one required input*. + +## 2. Synopsis +```text +$ ./baseline unlock [-h] --output OUTPUT +``` + +The synopsis for this command shows its parameters and their usage. Optional parameters are shown in square brackets. + +A user **must** provide an output directory to unlock via `--output` argument. After running the unlock sub command, you can resume the build or run pipeline from where it left off by re-running it. + +Use you can always use the `-h` option for information on a specific command. + +### 2.1 Required Arguments + + `--output OUTPUT` +> **Output directory to unlock.** +> *type: path* +> +> Path to a previous run's output directory. This will remove a lock on the working directory. Please verify that the pipeline is not running before running this command. +> ***Example:*** `--output /data/$USER/baseline_out` + +### 2.2 Options + +Each of the following arguments are optional and do not need to be provided. + + `-h, --help` +> **Display Help.** +> *type: boolean* +> +> Shows command's synopsis, help message, and an example command +> +> ***Example:*** `--help` + + +## 3. Example +```bash +# Step 0.) Grab an interactive node (do not run on head node) +srun -N 1 -n 1 --time=12:00:00 -p interactive --mem=8gb --cpus-per-task=4 --pty bash +module purge +module load singularity snakemake + +# Step 1.) Unlock a pipeline output directory +baseline unlock --output /data/$USER/output +``` \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml new file mode 100644 index 0000000..fa2c47d --- /dev/null +++ b/mkdocs.yml @@ -0,0 +1,107 @@ +# Project Information +site_name: baseline +site_author: Skyler Kuhn +site_description: >- + An awesome OpenOmics baseline pipeline + +# Repository +repo_name: OpenOmics/baseline +repo_url: https://github.com/OpenOmics/baseline +edit_uri: https://github.com/OpenOmics/baseline/edit/main/docs/ + +# Extra +extra_css: + - css/extra.css + +# Copyright +copyright: Copyright © 2023 OpenOmics + +# Configuration +theme: + name: material + features: + - navigation.tabs + - navigation.top + - toc.integrate + palette: + # Palette toggle for light mode + - scheme: default + toggle: + icon: material/lightbulb-on + name: Switch to dark mode + # Palette toggle for dark mode + - scheme: slate + toggle: + icon: material/weather-night + name: Switch to light mode + logo: assets/icons/doc-book.svg + favicon: assets/favicon/favicon.ico + +# Plugins +plugins: + - search + - git-revision-date + - minify: + minify_html: true + +# Customization +extra: + social: + - icon: fontawesome/solid/users + link: https://idss-bioinformatics.nih.gov/ + - icon: fontawesome/brands/github + link: https://github.com/OpenOmics + - icon: fontawesome/brands/docker + link: https://hub.docker.com/u/skchronicles + version: + provider: mike + +# Extensions +markdown_extensions: + - markdown.extensions.md_in_html + - markdown.extensions.admonition + - markdown.extensions.attr_list + - markdown.extensions.def_list + - markdown.extensions.footnotes + - markdown.extensions.meta + - markdown.extensions.toc: + permalink: true + - pymdownx.arithmatex: + generic: true + - pymdownx.betterem: + smart_enable: all + - pymdownx.caret + - pymdownx.critic + - pymdownx.details + - pymdownx.emoji: + emoji_index: !!python/name:materialx.emoji.twemoji + emoji_generator: !!python/name:materialx.emoji.to_svg + - pymdownx.highlight + - pymdownx.inlinehilite + - pymdownx.keys + - pymdownx.magiclink: + repo_url_shorthand: true + user: squidfunk + repo: mkdocs-material + - pymdownx.mark + - pymdownx.smartsymbols + - pymdownx.snippets: + check_paths: true + - pymdownx.superfences + - pymdownx.tabbed: + alternate_style: true + - pymdownx.tasklist: + custom_checkbox: true + - pymdownx.tilde + +# Page Tree +nav: + - About: index.md + - Commands: + - baseline run: usage/run.md + - baseline unlock: usage/unlock.md + - baseline install: usage/install.md + - baseline cache: usage/cache.md + - FAQ: + - General Questions: faq/questions.md + - License: license.md diff --git a/resources/README.md b/resources/README.md new file mode 100644 index 0000000..2ae24af --- /dev/null +++ b/resources/README.md @@ -0,0 +1,5 @@ +### Resources + +This folder, `resources/`, is meant to contain all resources necessary for running the workflow. This can be small reference files, such as reference sequences or small databases. This directory also contains utility scripts or wrappers to help facilitate running the pipeline. + +Whenever feasible, they can also be downloaded programmatically via rules defined in the pipeline. diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..0a5233c --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,16 @@ +import os, sys +# Makes relative imports to work in Python 3.6 +# without the need of '.' before the name of the +# package or py file. +# Allows for consistent syntax of relative imports +# across python2 and python3. +here = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(here) + +# Ground source of truth for version information +try: + # Import from root of project directory + version = open(os.path.join(here, 'VERSION'), 'r').readlines()[0].strip() +except IOError: + # When namespace is __main__ + version = open(os.path.join(here, '..', 'VERSION'), 'r').readlines()[0].strip() diff --git a/src/run.py b/src/run.py new file mode 100644 index 0000000..635c903 --- /dev/null +++ b/src/run.py @@ -0,0 +1,748 @@ +#!/usr/bin/env python3 +# -*- coding: UTF-8 -*- + +# Python standard library +from __future__ import print_function +from shutil import copytree +import os, re, json, sys, subprocess + +# Local imports +from utils import (git_commit_hash, + join_jsons, + fatal, + which, + exists, + err) + +from . import version as __version__ + + +def init(repo_path, output_path, links=[], required=['workflow', 'resources', 'config']): + """Initialize the output directory. If user provides a output + directory path that already exists on the filesystem as a file + (small chance of happening but possible), a OSError is raised. If the + output directory PATH already EXISTS, it will not try to create the directory. + @param repo_path : + Path to installation source code and its templates + @param output_path : + Pipeline output path, created if it does not exist + @param links list[]: + List of files to symlink into output_path + @param required list[]: + List of folder to copy over into output_path + """ + if not exists(output_path): + # Pipeline output directory + # does not exist on filesystem + os.makedirs(output_path) + + elif exists(output_path) and os.path.isfile(output_path): + # Provided Path for pipeline + # output directory exists as file + raise OSError("""\n\tFatal: Failed to create provided pipeline output directory! + User provided --output PATH already exists on the filesystem as a file. + Please run {} again with a different --output PATH. + """.format(sys.argv[0]) + ) + + # Copy over templates are other required resources + copy_safe(source = repo_path, target = output_path, resources = required) + + # Create renamed symlinks for each rawdata + # file provided as input to the pipeline + inputs = sym_safe(input_data = links, target = output_path) + + return inputs + + +def copy_safe(source, target, resources = []): + """Private function: Given a list paths it will recursively copy each to the + target location. If a target path already exists, it will NOT over-write the + existing paths data. + @param resources : + List of paths to copy over to target location + @params source : + Add a prefix PATH to each resource + @param target : + Target path to copy templates and required resources + """ + + for resource in resources: + destination = os.path.join(target, resource) + if not exists(destination): + # Required resources do not exist + copytree(os.path.join(source, resource), destination) + + +def sym_safe(input_data, target): + """Creates re-named symlinks for each FastQ file provided + as input. If a symlink already exists, it will not try to create a new symlink. + If relative source PATH is provided, it will be converted to an absolute PATH. + @param input_data ]>: + List of input files to symlink to target location + @param target : + Target path to copy templates and required resources + @return input_fastqs list[]: + List of renamed input FastQs + """ + input_fastqs = [] # store renamed fastq file names + for file in input_data: + filename = os.path.basename(file) + renamed = os.path.join(target, rename(filename)) + input_fastqs.append(renamed) + + if not exists(renamed): + # Create a symlink if it does not already exist + # Follow source symlinks to resolve any binding issues + os.symlink(os.path.abspath(os.path.realpath(file)), renamed) + + return input_fastqs + + +def rename(filename): + """Dynamically renames FastQ file to have one of the following extensions: *.R1.fastq.gz, *.R2.fastq.gz + To automatically rename the fastq files, a few assumptions are made. If the extension of the + FastQ file cannot be infered, an exception is raised telling the user to fix the filename + of the fastq files. + @param filename : + Original name of file to be renamed + @return filename : + A renamed FastQ filename + """ + # Covers common extensions from SF, SRA, EBI, TCGA, and external sequencing providers + # key = regex to match string and value = how it will be renamed + extensions = { + # Matches: _R[12]_fastq.gz, _R[12].fastq.gz, _R[12]_fq.gz, etc. + ".R1.f(ast)?q.gz$": ".R1.fastq.gz", + ".R2.f(ast)?q.gz$": ".R2.fastq.gz", + # Matches: _R[12]_001_fastq_gz, _R[12].001.fastq.gz, _R[12]_001.fq.gz, etc. + # Capture lane information as named group + ".R1.(?P...).f(ast)?q.gz$": ".R1.fastq.gz", + ".R2.(?P...).f(ast)?q.gz$": ".R2.fastq.gz", + # Matches: _[12].fastq.gz, _[12].fq.gz, _[12]_fastq_gz, etc. + "_1.f(ast)?q.gz$": ".R1.fastq.gz", + "_2.f(ast)?q.gz$": ".R2.fastq.gz" + } + + if (filename.endswith('.R1.fastq.gz') or + filename.endswith('.R2.fastq.gz')): + # Filename is already in the correct format + return filename + + converted = False + for regex, new_ext in extensions.items(): + matched = re.search(regex, filename) + if matched: + # regex matches with a pattern in extensions + converted = True + filename = re.sub(regex, new_ext, filename) + break # only rename once + + if not converted: + raise NameError("""\n\tFatal: Failed to rename provided input '{}'! + Cannot determine the extension of the user provided input file. + Please rename the file list above before trying again. + Here is example of acceptable input file extensions: + sampleName.R1.fastq.gz sampleName.R2.fastq.gz + sampleName_R1_001.fastq.gz sampleName_R2_001.fastq.gz + sampleName_1.fastq.gz sampleName_2.fastq.gz + Please also check that your input files are gzipped? + If they are not, please gzip them before proceeding again. + """.format(filename, sys.argv[0]) + ) + + return filename + + +def setup(sub_args, ifiles, repo_path, output_path): + """Setup the pipeline for execution and creates config file from templates + @param sub_args : + Parsed arguments for run sub-command + @param repo_path : + Path to installation or source code and its templates + @param output_path : + Pipeline output path, created if it does not exist + @return config : + Config dictionary containing metadata to run the pipeline + """ + # Check for mixed inputs, + # inputs which are a mixture + # of FastQ and BAM files + mixed_inputs(ifiles) + + # Resolves PATH to reference file + # template or a user generated + # reference genome built via build + # subcommand + genome_config = os.path.join(output_path,'config','genome.json') + # if sub_args.genome.endswith('.json'): + # Provided a custom reference genome generated by build pipline + # genome_config = os.path.abspath(sub_args.genome) + + required = { + # Base configuration file + "base": os.path.join(output_path,'config','config.json'), + # Template for project-level information + "project": os.path.join(output_path,'config','containers.json'), + # Template for genomic reference files + # User provided argument --genome is used to select the template + "genome": genome_config, + # Template for tool information + "tools": os.path.join(output_path,'config', 'modules.json'), + } + + # Create the global or master config + # file for pipeline, config.json + config = join_jsons(required.values()) # uses templates in config/*.json + config['project'] = {} + config = add_user_information(config) + config = add_rawdata_information(sub_args, config, ifiles) + + # Resolves if an image needs to be pulled + # from an OCI registry or a local SIF exists + config = image_cache(sub_args, config, output_path) + + # Add other runtime info for debugging + config['project']['version'] = __version__ + config['project']['workpath'] = os.path.abspath(sub_args.output) + git_hash = git_commit_hash(repo_path) + config['project']['git_commit_hash'] = git_hash # Add latest git commit hash + config['project']['pipeline_path'] = repo_path # Add path to installation + + # Add all cli options for data provenance + for opt, v in vars(sub_args).items(): + if opt == 'func': + # Pass over sub command's handler + continue + elif not isinstance(v, (list, dict)): + # CLI value can be converted to a string + v = str(v) + config['options'][opt] = v + + + return config + + +def unpacked(nested_dict): + """Generator to recursively retrieves all values in a nested dictionary. + @param nested_dict dict[]: + Nested dictionary to unpack + @yields value in dictionary + """ + # Iterate over all values of + # given dictionary + for value in nested_dict.values(): + # Check if value is of dict type + if isinstance(value, dict): + # If value is dict then iterate + # over all its values recursively + for v in unpacked(value): + yield v + else: + # If value is not dict type + # then yield the value + yield value + + +def get_fastq_screen_paths(fastq_screen_confs, match = 'DATABASE', file_index = -1): + """Parses fastq_screen.conf files to get the paths of each fastq_screen database. + This path contains bowtie2 indices for reference genome to screen against. + The paths are added as singularity bind points. + @param fastq_screen_confs list[]: + Name of fastq_screen config files to parse + @param match : + Keyword to indicate a line match [default: 'DATABASE'] + @param file_index : + Index of line line containing the fastq_screen database path + @return list[]: + Returns a list of fastq_screen database paths + """ + databases = [] + for file in fastq_screen_confs: + with open(file, 'r') as fh: + for line in fh: + if line.startswith(match): + db_path = line.strip().split()[file_index] + databases.append(db_path) + return databases + + +def resolve_additional_bind_paths(search_paths): + """Finds additional singularity bind paths from a list of random paths. Paths are + indexed with a compostite key containing the first two directories of an absolute + file path to avoid issues related to shared names across the /gpfs shared network + filesystem. For each indexed list of file paths, a common path is found. Assumes + that the paths provided are absolute paths, the build sub command creates reference + files with absolute filenames. + @param search_paths list[]: + List of absolute file paths to find common bind paths from + @return common_paths list[]: + Returns a list of common shared file paths to create additional singularity bind paths + """ + common_paths = [] + indexed_paths = {} + + for ref in search_paths: + # Skip over resources with remote URI and + # skip over strings that are not file PATHS as + # build command creates absolute resource PATHS + if ref.lower().startswith('sftp://') or \ + ref.lower().startswith('s3://') or \ + ref.lower().startswith('gs://') or \ + not ref.lower().startswith(os.sep): + continue + + # Break up path into directory tokens + path_list = os.path.abspath(ref).split(os.sep) + try: # Create composite index from first two directories + # Avoids issues created by shared /gpfs/ PATHS + index = path_list[1:3] + index = tuple(index) + except IndexError: + index = path_list[1] # ref startswith / + if index not in indexed_paths: + indexed_paths[index] = [] + # Create an INDEX to find common PATHS for each root + # child directory like /scratch or /data. This prevents + # issues when trying to find the common path betweeen + # these two different directories (resolves to /) + indexed_paths[index].append(str(os.sep).join(path_list)) + + for index, paths in indexed_paths.items(): + # Find common paths for each path index + p = os.path.dirname(os.path.commonprefix(paths)) + if p == os.sep: + # Aviods adding / to bind list when + # given /tmp or /scratch as input + p = os.path.commonprefix(paths) + common_paths.append(p) + + return list(set(common_paths)) + + +def bind(sub_args, config): + """Resolves bindpaths for singularity/docker images. + @param sub_args : + Parsed arguments for run sub-command + @param configfile dict[]: + Config dictionary generated by setup command. + @return bindpaths list[]: + List of singularity/docker bind paths + """ + bindpaths = [] + for value in unpacked(config): + if not isinstance(value, str): + continue + if exists(value): + if os.path.isfile(value): + value = os.path.dirname(value) + if value not in bindpaths: + bindpaths.append(value) + + # Bind input file paths, working + # directory, and other reference + # genome paths + rawdata_bind_paths = [os.path.realpath(p) for p in config['project']['datapath'].split(',')] + working_directory = os.path.realpath(config['project']['workpath']) + genome_bind_paths = resolve_additional_bind_paths(bindpaths) + bindpaths = [working_directory] + rawdata_bind_paths + genome_bind_paths + bindpaths = list(set([p for p in bindpaths if p != os.sep])) + + return bindpaths + + +def mixed_inputs(ifiles): + """Check if a user has provided a set of input files which contain a + mixture of FastQ and BAM files. The pipeline does not support processing + a mix of FastQ and BAM files. + @params ifiles list[]: + List containing pipeline input files (renamed symlinks) + """ + bam_files, fq_files = [], [] + fastqs = False + bams = False + for file in ifiles: + if file.endswith('.R1.fastq.gz') or file.endswith('.R2.fastq.gz'): + fastqs = True + fq_files.append(file) + elif file.endswith('.bam'): + bams = True + bam_files.append(file) + + if fastqs and bams: + # User provided a mix of FastQs and BAMs + raise TypeError("""\n\tFatal: Detected a mixture of --input data types. + A mixture of BAM and FastQ files were provided; however, the pipeline + does NOT support processing a mixture of input FastQ and BAM files. + Input FastQ Files: + {} + Input BAM Files: + {} + Please do not run the pipeline with a mixture of FastQ and BAM files. + This feature is currently not supported within '{}', and it is not + recommended to process samples in this way either. If this is a priority + for your project, please run the set of FastQ and BAM files separately + (in two separate output directories). If you feel like this functionality + should exist, feel free to open an issue on Github. + """.format(" ".join(fq_files), " ".join(bam_files), sys.argv[0]) + ) + +def add_user_information(config): + """Adds username and user's home directory to config. + @params config : + Config dictionary containing metadata to run pipeline + @return config : + Updated config dictionary containing user information (username and home directory) + """ + # Get PATH to user's home directory + # Method is portable across unix-like + # OS and Windows + home = os.path.expanduser("~") + + # Get username from home directory PATH + username = os.path.split(home)[-1] + + # Update config with home directory and + # username + config['project']['userhome'] = home + config['project']['username'] = username + + return config + + +def add_sample_metadata(input_files, config, group=None): + """Adds sample metadata such as sample basename, label, and group information. + If sample sheet is provided, it will default to using information in that file. + If no sample sheet is provided, it will only add sample basenames and labels. + @params input_files list[]: + List containing pipeline input fastq files + @params config : + Config dictionary containing metadata to run pipeline + @params group : + Sample sheet containing basename, group, and label for each sample + @return config : + Updated config with basenames, labels, and groups (if provided) + """ + import re + + # TODO: Add functionality for basecase + # when user has samplesheet + added = [] + config['samples'] = [] + for file in input_files: + # Split sample name on file extension + sample = re.split('\.R[12]\.fastq\.gz', os.path.basename(file))[0] + if sample not in added: + # Only add PE sample information once + added.append(sample) + config['samples'].append(sample) + + return config + + +def add_rawdata_information(sub_args, config, ifiles): + """Adds information about rawdata provided to pipeline. + Determines whether the dataset is paired-end or single-end and finds the set of all + rawdata directories (needed for -B option when running singularity). If a user provides + paired-end data, checks to see if both mates (R1 and R2) are present for each sample. + @param sub_args : + Parsed arguments for run sub-command + @params ifiles list[]: + List containing pipeline input files (renamed symlinks) + @params config : + Config dictionary containing metadata to run pipeline + @return config : + Updated config dictionary containing user information (username and home directory) + """ + + # Determine whether dataset is paired-end + # or single-end + # Updates config['project']['nends'] where + # 1 = single-end, 2 = paired-end, -1 = bams + convert = {1: 'single-end', 2: 'paired-end', -1: 'bam'} + nends = get_nends(ifiles) # Checks PE data for both mates (R1 and R2) + config['project']['nends'] = nends + config['project']['filetype'] = convert[nends] + + # Finds the set of rawdata directories to bind + rawdata_paths = get_rawdata_bind_paths(input_files = sub_args.input) + config['project']['datapath'] = ','.join(rawdata_paths) + + # Add each sample's basename + config = add_sample_metadata(input_files = ifiles, config = config) + + return config + + +def image_cache(sub_args, config, repo_path): + """Adds Docker Image URIs, or SIF paths to config if singularity cache option is provided. + If singularity cache option is provided and a local SIF does not exist, a warning is + displayed and the image will be pulled from URI in 'config/containers.json'. + @param sub_args : + Parsed arguments for run sub-command + @params config : + Docker Image config file + @param repo_path : + Path to installation or source code and its templates + @return config : + Updated config dictionary containing user information (username and home directory) + """ + images = os.path.join(repo_path, 'config','containers.json') + + # Read in config for docker image uris + with open(images, 'r') as fh: + data = json.load(fh) + # Check if local sif exists + for image, uri in data['images'].items(): + if sub_args.sif_cache: + sif = os.path.join(sub_args.sif_cache, '{}.sif'.format(os.path.basename(uri).replace(':', '_'))) + if not exists(sif): + # If local sif does not exist on in cache, + # print warning and default to pulling from + # URI in config/containers.json + print('Warning: Local image "{}" does not exist in singularity cache'.format(sif), file=sys.stderr) + else: + # Change pointer to image from Registry URI + # to local SIF + data['images'][image] = sif + + config.update(data) + + return config + + +def get_nends(ifiles): + """Determines whether the dataset is paired-end or single-end. + If paired-end data, checks to see if both mates (R1 and R2) are present for each sample. + If single-end, nends is set to 1. Else if paired-end, nends is set to 2. + @params ifiles list[]: + List containing pipeline input files (renamed symlinks) + @return nends_status : + Integer reflecting nends status: 1 = se, 2 = pe, -1 = bams + """ + # Determine if dataset contains paired-end data + paired_end = False + bam_files = False + nends_status = 1 + for file in ifiles: + if file.endswith('.bam'): + bam_files = True + nends_status = -1 + break + elif file.endswith('.R2.fastq.gz'): + paired_end = True + nends_status = 2 + break # dataset is paired-end + + # Check to see if both mates (R1 and R2) + # are present paired-end data + if paired_end: + nends = {} # keep count of R1 and R2 for each sample + for file in ifiles: + # Split sample name on file extension + sample = re.split('\.R[12]\.fastq\.gz', os.path.basename(file))[0] + if sample not in nends: + nends[sample] = 0 + + nends[sample] += 1 + + # Check if samples contain both read mates + missing_mates = [sample for sample, count in nends.items() if count == 1] + if missing_mates: + # Missing an R1 or R2 for a provided input sample + raise NameError("""\n\tFatal: Detected pair-end data but user failed to provide + both mates (R1 and R2) for the following samples:\n\t\t{}\n + Please check that the basename for each sample is consistent across mates. + Here is an example of a consistent basename across mates: + consistent_basename.R1.fastq.gz + consistent_basename.R2.fastq.gz + + Please do not run the pipeline with a mixture of single-end and paired-end + samples. This feature is currently not supported within {}, and it is + not recommended either. If this is a priority for your project, please run + paired-end samples and single-end samples separately (in two separate output + directories). If you feel like this functionality should exist, feel free to + open an issue on Github. + """.format(missing_mates, sys.argv[0]) + ) + elif not bam_files: + # Provided only single-end data + # not supported or recommended + raise TypeError("""\n\tFatal: Single-end data detected. + {} does not support single-end data. Calling variants from single-end + data is not recommended either. If you feel like this functionality should + exist, feel free to open an issue on Github. + """.format(sys.argv[0]) + ) + + return nends_status + + +def get_rawdata_bind_paths(input_files): + """ + Gets rawdata bind paths of user provided fastq files. + @params input_files list[]: + List containing user-provided input fastq files + @return bindpaths : + Set of rawdata bind paths + """ + bindpaths = [] + for file in input_files: + # Get directory of input file + rawdata_src_path = os.path.dirname(os.path.abspath(os.path.realpath(file))) + if rawdata_src_path not in bindpaths: + bindpaths.append(rawdata_src_path) + + return bindpaths + + +def dryrun(outdir, config='config.json', snakefile=os.path.join('workflow', 'Snakefile')): + """Dryruns the pipeline to ensure there are no errors prior to runnning. + @param outdir : + Pipeline output PATH + @return dryrun_output : + Byte string representation of dryrun command + """ + try: + # Setting cores to dummy high number so + # displays the true number of cores a rule + # will use, it uses the min(--cores CORES, N) + dryrun_output = subprocess.check_output([ + 'snakemake', '-npr', + '-s', str(snakefile), + '--use-singularity', + '--rerun-incomplete', + '--cores', str(256), + '--configfile={}'.format(config) + ], cwd = outdir, + stderr=subprocess.STDOUT) + except OSError as e: + # Catch: OSError: [Errno 2] No such file or directory + # Occurs when command returns a non-zero exit-code + if e.errno == 2 and not which('snakemake'): + # Failure caused because snakemake is NOT in $PATH + err('\n\x1b[6;37;41mError: Are snakemake AND singularity in your $PATH?\x1b[0m') + fatal('\x1b[6;37;41mPlease check before proceeding again!\x1b[0m') + else: + # Failure caused by unknown cause, raise error + raise e + except subprocess.CalledProcessError as e: + print(e, e.output.decode("utf-8")) + raise(e) + + return dryrun_output + + +def runner(mode, outdir, alt_cache, logger, additional_bind_paths = None, + threads=2, jobname='pl:master', submission_script='run.sh', + tmp_dir = '/lscratch/$SLURM_JOBID/'): + """Runs the pipeline via selected executor: local or slurm. + If 'local' is selected, the pipeline is executed locally on a compute node/instance. + If 'slurm' is selected, jobs will be submited to the cluster using SLURM job scheduler. + Support for additional job schedulers (i.e. PBS, SGE, LSF) may be added in the future. + @param outdir : + Pipeline output PATH + @param mode : + Execution method or mode: + local runs serially a compute instance without submitting to the cluster. + slurm will submit jobs to the cluster using the SLURM job scheduler. + @param additional_bind_paths : + Additional paths to bind to container filesystem (i.e. input file paths) + @param alt_cache : + Alternative singularity cache location + @param logger : + An open file handle for writing + @param threads : + Number of threads to use for local execution method + @param masterjob : + Name of the master job + @return masterjob : + """ + # Add additional singularity bind PATHs + # to mount the local filesystem to the + # containers filesystem, NOTE: these + # PATHs must be an absolute PATHs + outdir = os.path.abspath(outdir) + # Add any default PATHs to bind to + # the container's filesystem, like + # tmp directories, /lscratch + addpaths = [] + temp = os.path.dirname(tmp_dir.rstrip('/')) + if temp == os.sep: + temp = tmp_dir.rstrip('/') + if outdir not in additional_bind_paths.split(','): + addpaths.append(outdir) + if temp not in additional_bind_paths.split(','): + addpaths.append(temp) + bindpaths = ','.join(addpaths) + + # Set ENV variable 'SINGULARITY_CACHEDIR' + # to output directory + my_env = {}; my_env.update(os.environ) + cache = os.path.join(outdir, ".singularity") + my_env['SINGULARITY_CACHEDIR'] = cache + if alt_cache: + # Override the pipeline's default + # cache location + my_env['SINGULARITY_CACHEDIR'] = alt_cache + cache = alt_cache + + if additional_bind_paths: + # Add Bind PATHs for outdir and tmp dir + if bindpaths: + bindpaths = ",{}".format(bindpaths) + bindpaths = "{}{}".format(additional_bind_paths,bindpaths) + + if not exists(os.path.join(outdir, 'logfiles')): + # Create directory for logfiles + os.makedirs(os.path.join(outdir, 'logfiles')) + + # Create .singularity directory for + # installations of snakemake without + # setuid which creates a sandbox in + # the SINGULARITY_CACHEDIR + if not exists(cache): + # Create directory for sandbox + # and image layers + os.makedirs(cache) + + # Run on compute node or instance + # without submitting jobs to a scheduler + if mode == 'local': + # Run pipeline's main process + # Look into later: it maybe worth + # replacing Popen subprocess with a direct + # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html + masterjob = subprocess.Popen([ + 'snakemake', '-pr', '--rerun-incomplete', + '--use-singularity', + '--singularity-args', "'-B {}'".format(bindpaths), + '--cores', str(threads), + '--configfile=config.json' + ], cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env) + + # Submitting jobs to cluster via SLURM's job scheduler + elif mode == 'slurm': + # Run pipeline's main process + # Look into later: it maybe worth + # replacing Popen subprocess with a direct + # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html + # CLUSTER_OPTS="'sbatch --gres {cluster.gres} --cpus-per-task {cluster.threads} -p {cluster.partition} \ + # -t {cluster.time} --mem {cluster.mem} --job-name={params.rname} -e $SLURMDIR/slurm-%j_{params.rname}.out \ + # -o $SLURMDIR/slurm-%j_{params.rname}.out'" + # sbatch --parsable -J "$2" --gres=lscratch:500 --time=5-00:00:00 --mail-type=BEGIN,END,FAIL \ + # --cpus-per-task=32 --mem=96g --output "$3"/logfiles/snakemake.log --error "$3"/logfiles/snakemake.log \ + # snakemake --latency-wait 120 -s "$3"/workflow/Snakefile -d "$3" \ + # --use-singularity --singularity-args "'-B $4'" --configfile="$3"/config.json \ + # --printshellcmds --cluster-config "$3"/resources/cluster.json \ + # --cluster "${CLUSTER_OPTS}" --keep-going --restart-times 3 -j 500 \ + # --rerun-incomplete --stats "$3"/logfiles/runtime_statistics.json \ + # --keep-remote --local-cores 30 2>&1 | tee -a "$3"/logfiles/master.log + masterjob = subprocess.Popen([ + str(submission_script), mode, + '-j', jobname, '-b', str(bindpaths), + '-o', str(outdir), '-c', str(cache), + '-t', "'{}'".format(tmp_dir) + ], cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env) + + return masterjob diff --git a/src/run.sh b/src/run.sh new file mode 100755 index 0000000..1c3bb06 --- /dev/null +++ b/src/run.sh @@ -0,0 +1,291 @@ +#!/usr/bin/env bash +set -eu + +function usage() { cat << EOF +run.sh: submits the master job of pipeline to the job scheduler. +USAGE: + run.sh [-h] [-c CACHE] \\ + -o OUTDIR \\ + -j MASTER_JOB_NAME \\ + -b SINGULARITY_BIND_PATHS \\ + -t TMP_DIR +SYNOPSIS: + This script creates/submits the pipeline's master job to the +cluster. The master job acts as the pipeline's main controller or +its main process. This main job dictates how subsequent jobs are +submitted to the cluster via the job scheduler, SLURM. Support for +additional job schedulers (i.e. PBS, SGE, LSF, Tibanna) may be added +in the future. + The main entry point of the pipeline calls this job submission +wrapper script. As so, this script can be used to by-pass a previously +failed run; meaning, it can be used to re-run the pipeline to pick back +off where the last failure occurred or re-start the pipeline. + Please Note: it is highly recommended to use the main entry point of +the pipeline instead of directly invoking this script. As so, please use +main entry point of the pipeline. If you are experiencing an error, it +maybe due to improperly mounting singularity bind paths which the main +entry point will internally handle. Only advanced users should directly +invoke this script. +Required Positional Argument: + [1] MODE [Type: Str] Defines the snakemake execution mode. + Valid mode options include: + slurm: uses slurm and singularity/snakemake + backend. This EXECUTOR will submit child + jobs to the cluster. It is recommended + running the pipeline in this mode, as most + of the steps are computationally intensive. +Required Arguments: + -o, --outdir [Type: Path] Path to output directory of the pipeline. + This is the pipeline's working directory + where all output files will be generated. + -j, --job-name [Type: Str] Name of pipeline's master job. + -b, --bind-paths [Type:Path] Singularity bind paths. The pipeline uses + singularity images for exection. Bind + paths are used to mount the host file + system to the container's file system. + Multiple bind paths can be provided as + a comma seperated list. The main entry + point of the pipeline internally collects + and aggregates bind paths to mount to the + container's filesystem. + If you are manually running this script + or by-passing the main entry point, you + will need to provide the bind paths of + the rawdata directory(s) along with the + pipeline's output directory and any other + directories for reference files. Please see + example usage below. + -t, --tmp-dir [Type:Path] Temporary directory. The pipeline generates + intermediate, temporary output files. Any + temporary output files will be written to + this location. On Biowulf, it should be + set to '/lscratch/\$SLURM_JOBID/'. On FRCE, + this value should be set to the following: + '/scratch/cluster_scratch/\$USER/'. +OPTIONS: + -c, --cache [Type: Path] Path to singularity cache. If not provided, + the path will default to the current working + directory of this script. + [Default: $(dirname "$0")/.singularity/] + -h, --help [Type: Bool] Displays usage and help information. +Example: + $ runner slurm -h + $ runner slurm -j mjobid -b "/data/$USER/,/lscratch" +Version: + 1.0.0 +EOF +} + + +# Functions +function err() { cat <<< "$@" 1>&2; } +function fatal() { cat <<< "$@" 1>&2; usage; exit 1; } +function abspath() { readlink -e "$1"; } +function parser() { + # Adds parsed command-line args to GLOBAL $Arguments associative array + # + KEYS = short_cli_flag ("j", "o", ...) + # + VALUES = parsed_user_value ("MasterJobName" "/scratch/hg38", ...) + # @INPUT "$@" = user command-line arguments + # @CALLS check() to see if the user provided all the required arguments + + while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -h | --help) usage && exit 0;; + -j | --job-name) provided "$key" "${2:-}"; Arguments["j"]="$2"; shift; shift;; + -b | --bind-paths) provided "$key" "${2:-}"; Arguments["b"]="$2"; shift; shift;; + -t | --tmp-dir) provided "$key" "${2:-}"; Arguments["t"]="$2"; shift; shift;; + -o | --outdir) provided "$key" "${2:-}"; Arguments["o"]="$2"; shift; shift;; + -c | --cache) provided "$key" "${2:-}"; Arguments["c"]="$2"; shift; shift;; + -* | --*) err "Error: Failed to parse unsupported argument: '${key}'."; usage && exit 1;; + *) err "Error: Failed to parse unrecognized argument: '${key}'. Do any of your inputs have spaces?"; usage && exit 1;; + esac + done + + # Check for required args + check +} + + +function provided() { + # Checks to see if the argument's value exists + # @INPUT $1 = name of user provided argument + # @INPUT $2 = value of user provided argument + # @CALLS fatal() if value is empty string or NULL + + if [[ -z "${2:-}" ]]; then + fatal "Fatal: Failed to provide value to '${1}'!"; + fi +} + + +function check(){ + # Checks to see if user provided required arguments + # @INPUTS $Arguments = Global Associative Array + # @CALLS fatal() if user did NOT provide all the $required args + + # List of required arguments + local required=("j" "b" "o") + #echo -e "Provided Required Inputs" + for arg in "${required[@]}"; do + value=${Arguments[${arg}]:-} + if [[ -z "${value}" ]]; then + fatal "Failed to provide all required args.. missing ${arg}" + fi + done +} + + +function require(){ + # Requires an executable is in $PATH, as a last resort it will attempt to load + # the executable or dependency as a module + # @INPUT $@ = List of dependencies or executables to check + + for exe in "${@}"; do + # Check if executable is in $PATH + command -V ${exe} &> /dev/null && continue; + # Try to load exe as lua module + module load ${exe} &> /dev/null || \ + fatal "Failed to find or load '${exe}', not installed on target system." + done +} + + +function submit(){ + # Submit jobs to the defined job scheduler or executor (i.e. slurm) + # INPUT $1 = Snakemake Mode of execution + # INPUT $2 = Name of master/main job or process (pipeline controller) + # INPUT $3 = Pipeline output directory + # INPUT $4 = Singularity Bind paths + # INPUT $5 = Singularity cache directory + # INPUT $6 = Temporary directory for output files + + # Check if singularity and snakemake are in $PATH + # If not, try to module load singularity as a last resort + require singularity snakemake + + # Snakemake executor, or target job scheduler + # more maybe added in the future, TBA + executor=${1} + + # Goto Pipeline Ouput directory + # Create a local singularity cache in output directory + # cache can be re-used instead of re-pulling from DockerHub everytime + cd "$3" && export SINGULARITY_CACHEDIR="${5}" + + # unsetting XDG_RUNTIME_DIR to avoid some unsighly but harmless warnings + unset XDG_RUNTIME_DIR + + # Run the workflow with specified executor + case "$executor" in + slurm) + # Create directory for logfiles + mkdir -p "$3"/logfiles/slurmfiles/ + # Submit the master job to the cluster + # sbatch --parsable -J {jobname} --time=5-00:00:00 --mail-type=BEGIN,END,FAIL + # --cpus-per-task=24 --mem=96g --gres=lscratch:500 + # --output os.path.join({outdir}, 'logfiles', 'snakemake.log') --error os.path.join({outdir}, 'logfiles', 'snakemake.log') + # snakemake -pr --latency-wait 120 -d {outdir} --configfile=config.json + # --cluster-config os.path.join({outdir}, 'config', 'cluster.json') + # --cluster {CLUSTER_OPTS} --stats os.path.join({outdir}, 'logfiles', 'runtime_statistics.json') + # --printshellcmds --keep-going --rerun-incomplete + # --keep-remote --restart-times 3 -j 500 --use-singularity + # --singularity-args -B {}.format({bindpaths}) --local-cores 24 + SLURM_DIR="$3/logfiles/slurmfiles" + CLUSTER_OPTS="sbatch --gres {cluster.gres} --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname} -e $SLURM_DIR/slurm-%j_{params.rname}.out -o $SLURM_DIR/slurm-%j_{params.rname}.out" + # Check if NOT running on Biowulf + # Assumes other clusters do NOT + # have GRES for local node disk, + # long term it might be worth + # adding a new option to allow + # a user to decide whether to + # use GRES at job submission, + # trying to infer this because + # most users will not even know + # what GRES is and how or why + # it should be used and by default + # SLURM is not configured to use + # GRES, remove prefix single quote + if [[ ${6#\'} != /lscratch* ]]; then + CLUSTER_OPTS="sbatch --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname} -e $SLURM_DIR/slurm-%j_{params.rname}.out -o $SLURM_DIR/slurm-%j_{params.rname}.out" + fi + # Create sbacth script to build index + cat << EOF > kickoff.sh +#!/usr/bin/env bash +#SBATCH --cpus-per-task=16 +#SBATCH --mem=96g +#SBATCH --time=5-00:00:00 +#SBATCH --parsable +#SBATCH -J "$2" +#SBATCH --mail-type=BEGIN,END,FAIL +#SBATCH --output "$3/logfiles/snakemake.log" +#SBATCH --error "$3/logfiles/snakemake.log" +set -euo pipefail +# Main process of pipeline +snakemake --latency-wait 120 -s "$3/workflow/Snakefile" -d "$3" \\ + --use-singularity --singularity-args "'-B $4'" \\ + --use-envmodules --configfile="$3/config.json" \\ + --printshellcmds --cluster-config "$3/config/cluster.json" \\ + --cluster "${CLUSTER_OPTS}" --keep-going --restart-times 3 -j 500 \\ + --rerun-incomplete --stats "$3/logfiles/runtime_statistics.json" \\ + --keep-remote --local-cores 14 2>&1 +# Create summary report +snakemake -d "$3" --report "Snakemake_Report.html" +EOF + chmod +x kickoff.sh + job_id=$(sbatch kickoff.sh | tee -a "$3"/logfiles/master.log) + ;; + *) echo "${executor} is not available." && \ + fatal "Failed to provide valid execution backend: ${executor}. Please use slurm." + ;; + esac + + # Return exit-code of pipeline sumbission + echo "$job_id" +} + + +function main(){ + # Parses args and submits master job of pipeline to the target job scheduler + # @INPUT "$@" = command-line arguments + # @CALLS parser(), initialize(), setup(), cromwell() + + if [ $# -eq 0 ]; then usage; exit 1; fi + + # Associative array to store parsed args + declare -Ag Arguments + + # Positional Argument for Snakemake Executor + case $1 in + slurm) Arguments["e"]="$1";; + -h | --help | help) usage && exit 0;; + -* | --*) err "Error: Failed to provide required positional argument: ."; usage && exit 1;; + *) err "Error: Failed to provide valid positional argument. '${1}' is not supported. Valid option(s) are slurm"; usage && exit 1;; + esac + + # Parses remaining user provided command-line arguments + parser "${@:2}" # Remove first item of list + outdir="$(abspath "$(dirname "${Arguments[o]}")")" + Arguments[o]="${Arguments[o]%/}" # clean outdir path (remove trailing '/') + + # Setting defaults for non-required arguments + # If singularity cache not provided, default to ${outdir}/.singularity + cache="${Arguments[o]}/.singularity" + Arguments[c]="${Arguments[c]:-$cache}" + Arguments[c]="${Arguments[c]%/}" # clean outdir path (remove trailing '/') + + # Print pipeline metadata prior to running + echo -e "[$(date)] Running pipeline with the following parameters:" + for key in "${!Arguments[@]}"; do echo -e "\t${key}\t${Arguments["$key"]}"; done + + # Run pipeline and submit jobs to cluster using the defined executor + mkdir -p "${Arguments[o]}/logfiles/" + job_id=$(submit "${Arguments[e]}" "${Arguments[j]}" "${Arguments[o]}" "${Arguments[b]}" "${Arguments[c]}" "${Arguments[t]}") + echo -e "[$(date)] Pipeline submitted to cluster.\nMaster Job ID: $job_id" + echo "${job_id}" > "${Arguments[o]}/logfiles/mjobid.log" + +} + + +# Main: check usage, parse args, and run pipeline +main "$@" diff --git a/src/shells.py b/src/shells.py new file mode 100644 index 0000000..2fd2de1 --- /dev/null +++ b/src/shells.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +# -*- coding: UTF-8 -*- + +# Python standard library +from __future__ import print_function +from subprocess import CalledProcessError +import os, subprocess + +# Local imports +from utils import fatal, err + + +def set_options(strict): + """ + Changes behavior of default shell and get overrides options + to run bash in a strict mode. + @param strict : + Overrides default shell options and runs shell in strict or + less permissive mode. + @return prefix : + Returns overrides options to run bash in a strict mode + """ + prefix = '' # permissive shell option + if strict: + # Changes behavior of default shell + # set -e: exit immediately upon error + # set -u: treats unset variables as an error + # set -o pipefail: exits if a error occurs in any point of a pipeline + prefix = 'set -euo pipefail; ' + + return prefix + + +def bash(cmd, interpreter='/bin/bash', strict=set_options(True), cwd=os.getcwd(), **kwargs): + """ + Interface to run a process or bash command. Using subprocess.call_check() + due to portability across most python versions. It was introduced in python 2.5 + and it is also interoperabie across all python 3 versions. + @param cmd : + Shell command to run + @param interpreter : + Interpreter for command to run [default: bash] + @pararm strict : + Prefixes any command with 'set -euo pipefail' to ensure process fail with + the expected exit-code + @params kwargs : + Keyword arguments to modify subprocess.check_call() behavior + @return exitcode : + Returns the exit code of the run command, failures return non-zero exit codes + """ + try: + exitcode = subprocess.check_call(strict + cmd, + shell=True, + executable=interpreter, + cwd=cwd, + **kwargs + ) + except CalledProcessError as e: + exitcode = e.returncode + err("""WARNING: Failed to run '{}' command! + └── Command returned a non-zero exitcode of '{}'.""".format(strict + cmd, exitcode) + ) + + return exitcode + + +if __name__ == '__main__': + # Tests + bash('ls -la /home/') + bash('ls -la /fake/dne/path') diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..9ef48d8 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,367 @@ +#!/usr/bin/env python3 +# -*- coding: UTF-8 -*- + +# Python standard library +from __future__ import print_function +from shutil import copytree +import os, sys, hashlib +import subprocess, json + + +def md5sum(filename, first_block_only = False, blocksize = 65536): + """Gets md5checksum of a file in memory-safe manner. + The file is read in blocks/chunks defined by the blocksize parameter. This is + a safer option to reading the entire file into memory if the file is very large. + @param filename : + Input file on local filesystem to find md5 checksum + @param first_block_only : + Calculate md5 checksum of the first block/chunk only + @param blocksize : + Blocksize of reading N chunks of data to reduce memory profile + @return hasher.hexdigest() : + MD5 checksum of the file's contents + """ + hasher = hashlib.md5() + with open(filename, 'rb') as fh: + buf = fh.read(blocksize) + if first_block_only: + # Calculate MD5 of first block or chunck of file. + # This is a useful heuristic for when potentially + # calculating an MD5 checksum of thousand or + # millions of file. + hasher.update(buf) + return hasher.hexdigest() + while len(buf) > 0: + # Calculate MD5 checksum of entire file + hasher.update(buf) + buf = fh.read(blocksize) + + return hasher.hexdigest() + + +def permissions(parser, path, *args, **kwargs): + """Checks permissions using os.access() to see the user is authorized to access + a file/directory. Checks for existence, readability, writability and executability via: + os.F_OK (tests existence), os.R_OK (tests read), os.W_OK (tests write), os.X_OK (tests exec). + @param parser : + Argparse parser object + @param path : + Name of path to check + @return path : + Returns abs path if it exists and permissions are correct + """ + if not exists(path): + parser.error("Path '{}' does not exists! Failed to provide vaild input.".format(path)) + if not os.access(path, *args, **kwargs): + parser.error("Path '{}' exists, but cannot read path due to permissions!".format(path)) + + return os.path.abspath(path) + + +def standard_input(parser, path, *args, **kwargs): + """Checks for standard input when provided or permissions using permissions(). + @param parser : + Argparse parser object + @param path : + Name of path to check + @return path : + If path exists and user can read from location + """ + # Checks for standard input + if not sys.stdin.isatty(): + # Standard input provided, set path as an + # empty string to prevent searching of '-' + path = '' + return path + + # Checks for positional arguments as paths + path = permissions(parser, path, *args, **kwargs) + + return path + + +def exists(testpath): + """Checks if file exists on the local filesystem. + @param parser : + argparse parser object + @param testpath : + Name of file/directory to check + @return does_exist : + True when file/directory exists, False when file/directory does not exist + """ + does_exist = True + if not os.path.exists(testpath): + does_exist = False # File or directory does not exist on the filesystem + + return does_exist + + +def ln(files, outdir): + """Creates symlinks for files to an output directory. + @param files list[]: + List of filenames + @param outdir : + Destination or output directory to create symlinks + """ + # Create symlinks for each file in the output directory + for file in files: + ln = os.path.join(outdir, os.path.basename(file)) + if not exists(ln): + os.symlink(os.path.abspath(os.path.realpath(file)), ln) + + +def which(cmd, path=None): + """Checks if an executable is in $PATH + @param cmd : + Name of executable to check + @param path : + Optional list of PATHs to check [default: $PATH] + @return : + True if exe in PATH, False if not in PATH + """ + if path is None: + path = os.environ["PATH"].split(os.pathsep) + + for prefix in path: + filename = os.path.join(prefix, cmd) + executable = os.access(filename, os.X_OK) + is_not_directory = os.path.isfile(filename) + if executable and is_not_directory: + return True + return False + + +def err(*message, **kwargs): + """Prints any provided args to standard error. + kwargs can be provided to modify print functions + behavior. + @param message : + Values printed to standard error + @params kwargs + Key words to modify print function behavior + """ + print(*message, file=sys.stderr, **kwargs) + + + +def fatal(*message, **kwargs): + """Prints any provided args to standard error + and exits with an exit code of 1. + @param message : + Values printed to standard error + @params kwargs + Key words to modify print function behavior + """ + err(*message, **kwargs) + sys.exit(1) + + +def require(cmds, suggestions, path=None): + """Enforces an executable is in $PATH + @param cmds list[]: + List of executable names to check + @param suggestions list[]: + Name of module to suggest loading for a given index + in param cmd. + @param path list[]]: + Optional list of PATHs to check [default: $PATH] + """ + error = False + for i in range(len(cmds)): + available = which(cmds[i]) + if not available: + c = Colors + error = True + err("""\n{}{}Fatal: {} is not in $PATH and is required during runtime!{} + └── Possible solution: please 'module load {}' and run again!""".format( + c.bg_red, c.white, cmds[i], c.end, suggestions[i]) + ) + + if error: fatal() + + return + + +def safe_copy(source, target, resources = []): + """Private function: Given a list paths it will recursively copy each to the + target location. If a target path already exists, it will NOT over-write the + existing paths data. + @param resources : + List of paths to copy over to target location + @params source : + Add a prefix PATH to each resource + @param target : + Target path to copy templates and required resources + """ + + for resource in resources: + destination = os.path.join(target, resource) + if not exists(destination): + # Required resources do not exist + copytree(os.path.join(source, resource), destination) + + +def git_commit_hash(repo_path): + """Gets the git commit hash of the repo. + @param repo_path : + Path to git repo + @return githash : + Latest git commit hash + """ + try: + githash = subprocess.check_output(['git', 'rev-parse', 'HEAD'], stderr=subprocess.STDOUT, cwd = repo_path).strip().decode('utf-8') + # Typecast to fix python3 TypeError (Object of type bytes is not JSON serializable) + # subprocess.check_output() returns a byte string + githash = str(githash) + except Exception as e: + # Github releases are missing the .git directory, + # meaning you cannot get a commit hash, set the + # commit hash to indicate its from a GH release + githash = 'github_release' + return githash + + +def join_jsons(templates): + """Joins multiple JSON files to into one data structure + Used to join multiple template JSON files to create a global config dictionary. + @params templates : + List of template JSON files to join together + @return aggregated : + Dictionary containing the contents of all the input JSON files + """ + # Get absolute PATH to templates in git repo + repo_path = os.path.dirname(os.path.abspath(__file__)) + aggregated = {} + + for file in templates: + with open(os.path.join(repo_path, file), 'r') as fh: + aggregated.update(json.load(fh)) + + return aggregated + + +def check_cache(parser, cache, *args, **kwargs): + """Check if provided SINGULARITY_CACHE is valid. Singularity caches cannot be + shared across users (and must be owned by the user). Singularity strictly enforces + 0700 user permission on on the cache directory and will return a non-zero exitcode. + @param parser : + Argparse parser object + @param cache : + Singularity cache directory + @return cache : + If singularity cache dir is valid + """ + if not exists(cache): + # Cache directory does not exist on filesystem + os.makedirs(cache) + elif os.path.isfile(cache): + # Cache directory exists as file, raise error + parser.error("""\n\t\x1b[6;37;41mFatal: Failed to provided a valid singularity cache!\x1b[0m + The provided --singularity-cache already exists on the filesystem as a file. + Please run {} again with a different --singularity-cache location. + """.format(sys.argv[0])) + elif os.path.isdir(cache): + # Provide cache exists as directory + # Check that the user owns the child cache directory + # May revert to os.getuid() if user id is not sufficent + if exists(os.path.join(cache, 'cache')) and os.stat(os.path.join(cache, 'cache')).st_uid != os.getuid(): + # User does NOT own the cache directory, raise error + parser.error("""\n\t\x1b[6;37;41mFatal: Failed to provided a valid singularity cache!\x1b[0m + The provided --singularity-cache already exists on the filesystem with a different owner. + Singularity strictly enforces that the cache directory is not shared across users. + Please run {} again with a different --singularity-cache location. + """.format(sys.argv[0])) + + return cache + + +def unpacked(nested_dict): + """Generator to recursively retrieves all values in a nested dictionary. + @param nested_dict dict[]: + Nested dictionary to unpack + @yields value in dictionary + """ + # Iterate over all values of given dictionary + for value in nested_dict.values(): + # Check if value is of dict type + if isinstance(value, dict): + # If value is dict then iterate over + # all its values recursively + for v in unpacked(value): + yield v + else: + # If value is not dict type then + # yield the value + yield value + + +class Colors(): + """Class encoding for ANSI escape sequeces for styling terminal text. + Any string that is formatting with these styles must be terminated with + the escape sequence, i.e. `Colors.end`. + """ + # Escape sequence + end = '\33[0m' + # Formatting options + bold = '\33[1m' + italic = '\33[3m' + url = '\33[4m' + blink = '\33[5m' + higlighted = '\33[7m' + # Text Colors + black = '\33[30m' + red = '\33[31m' + green = '\33[32m' + yellow = '\33[33m' + blue = '\33[34m' + pink = '\33[35m' + cyan = '\33[96m' + white = '\33[37m' + # Background fill colors + bg_black = '\33[40m' + bg_red = '\33[41m' + bg_green = '\33[42m' + bg_yellow = '\33[43m' + bg_blue = '\33[44m' + bg_pink = '\33[45m' + bg_cyan = '\33[46m' + bg_white = '\33[47m' + + +def hashed(l): + """Returns an MD5 checksum for a list of strings. The list is sorted to + ensure deterministic results prior to generating the MD5 checksum. This + function can be used to generate a batch id from a list of input files. + It is worth noting that path should be removed prior to calculating the + checksum/hash. + @Input: + l list[]: List of strings to hash + @Output: + h : MD5 checksum of the sorted list of strings + Example: + $ echo -e '1\n2\n3' > tmp + $ md5sum tmp + # c0710d6b4f15dfa88f600b0e6b624077 tmp + hashed([1,2,3]) # returns c0710d6b4f15dfa88f600b0e6b624077 + """ + # Sort list to ensure deterministic results + l = sorted(l) + # Convert everything to strings + l = [str(s) for s in l] + # Calculate an MD5 checksum of results + h = hashlib.md5() + # encode method ensure cross-compatiability + # across python2 and python3 + h.update("{}\n".format("\n".join(l)).encode()) + h = h.hexdigest() + + return h + + +if __name__ == '__main__': + # Calculate MD5 checksum of entire file + print('{} {}'.format(md5sum(sys.argv[0]), sys.argv[0])) + # Calcualte MD5 cehcksum of 512 byte chunck of file, + # which is similar to following unix command: + # dd if=utils.py bs=512 count=1 2>/dev/null | md5sum + print('{} {}'.format(md5sum(sys.argv[0], first_block_only = True, blocksize = 512), sys.argv[0])) diff --git a/workflow/Snakefile b/workflow/Snakefile new file mode 100644 index 0000000..e8ca18b --- /dev/null +++ b/workflow/Snakefile @@ -0,0 +1,27 @@ +# Python standard library +from os.path import join +from os import listdir +import os, sys, json + +# 3rd party imports from pypi +from snakemake.workflow import workflow as wf_api +from snakemake.utils import R + +# Local imports +from scripts.common import ( + allocated, + provided, + references, + str_bool +) + + +# Final ouput files of the pipeline +rule all: + input: + [] + + +# Import rules +include: join("rules", "common.smk") +include: join("rules", "hooks.smk") diff --git a/workflow/rules/common.smk b/workflow/rules/common.smk new file mode 100644 index 0000000..a41b6b5 --- /dev/null +++ b/workflow/rules/common.smk @@ -0,0 +1 @@ +from scripts.common import abstract_location diff --git a/workflow/rules/hooks.smk b/workflow/rules/hooks.smk new file mode 100644 index 0000000..dfa4f47 --- /dev/null +++ b/workflow/rules/hooks.smk @@ -0,0 +1,99 @@ +# Adding handlers for displaying status of the +# pipeline and for getting job information for +# previously submitted jobs using `jobby`: +# https://github.com/OpenOmics/scribble/blob/main/scripts/jobby/jobby +if config['options']['mode'] == 'slurm': + onstart: + shell( + """ + # Move any job information for a previous + # instance of the pipeline to logfiles + sleep 5; rm -f COMPLETED FAILED RUNNING; + touch RUNNING + for f in job_information_*.tsv; do + # Skip over non-existant files + [ -e "${{f}}" ] || continue + mv ${{f}} logfiles/; + done + + for f in failed_jobs_*.tsv; do + # Skip over non-existant files + [ -e "${{f}}" ] || continue + mv ${{f}} logfiles/; + done + """ + ) + + onsuccess: + shell( + """ + # Get job information on all + # previously submitted jobs + sleep 15; rm -f COMPLETED FAILED RUNNING; + timestamp=$(date +"%Y-%m-%d_%H-%M-%S"); + ./workflow/scripts/jobby \\ + $(grep --color=never "^Submitted .* external jobid" logfiles/snakemake.log \\ + | awk '{{print $NF}}' \\ + | sed "s/['.]//g" \\ + | sort \\ + | uniq \\ + | tr "\\n" " " + ) \\ + > job_information_${{timestamp}}.tsv \\ + || {{ + # Handles edge case were only "rule all" + # is run and no child jobs are submitted + touch job_information_${{timestamp}}.tsv + }} + + # Get information on any child + # job(s) that may have failed + grep --color=never \\ + '^jobid\\|FAILED' \\ + job_information_${{timestamp}}.tsv \\ + > failed_jobs_${{timestamp}}.tsv \\ + || {{ + # Handles edge case were only "rule all" + # is run and no child jobs are submitted + touch failed_jobs_${{timestamp}}.tsv + }} + touch COMPLETED + """ + ) + + onerror: + shell( + """ + # Get job information on all + # previously submitted jobs + sleep 15; rm -f COMPLETED FAILED RUNNING; + timestamp=$(date +"%Y-%m-%d_%H-%M-%S"); + ./workflow/scripts/jobby \\ + $(grep --color=never "^Submitted .* external jobid" logfiles/snakemake.log \\ + | awk '{{print $NF}}' \\ + | sed "s/['.]//g" \\ + | sort \\ + | uniq \\ + | tr "\\n" " " + ) \\ + > job_information_${{timestamp}}.tsv \\ + || {{ + # Handles edge case were only "rule all" + # is run and no child jobs are submitted + touch job_information_${{timestamp}}.tsv + }} + + # Get information on any child + # job(s) that may have failed + grep --color=never \\ + '^jobid\\|FAILED' \\ + job_information_${{timestamp}}.tsv \\ + > failed_jobs_${{timestamp}}.tsv \\ + || {{ + # Handles edge case were only "rule all" + # is run and no child jobs are submitted + touch failed_jobs_${{timestamp}}.tsv + }} + touch FAILED + """ + ) diff --git a/workflow/scripts/common.py b/workflow/scripts/common.py new file mode 100644 index 0000000..0e51470 --- /dev/null +++ b/workflow/scripts/common.py @@ -0,0 +1,204 @@ +# Common helper functions shared across the entire workflow +def provided(samplelist, condition): + """ + Determines if optional rules should run. If an empty list is provided to rule all, + snakemake will not try to generate that set of target files. If a given condition + is not met (i.e. False) then it will not try to run that rule. + """ + + if not condition: + # If condition is False, + # returns an empty list + # to prevent rule from + # running + samplelist = [] + + return samplelist + + +def ignore(samplelist, condition): + """ + Determines if optional rules should run. If an empty list is provided to rule all, + snakemake will not try to generate that set of target files. If a given condition + is met (i.e. True) then it will not try to run that rule. This function is the + inverse to provided(). + """ + + if condition: + # If condition is True, + # returns an empty list + # to prevent rule from + # running + samplelist = [] + + return samplelist + + +def s3_configured(uri): + """ + Determines if user can access s3 object using their credentials saved in + "~/.aws/credentials" or "~/.boto". This handles an edge case where a user + has aws configure on the target system but the AWS Access Key Id provided in + their config does not match s3 access policy. This usually occurs when a user + has setup aws but it is setup for another AWS account or one that does not + have an IAM role for our S3 bucket. + :param uri : URI/URL to object in S3 bucket + :return accessible : + True if user can access S3 object, False if user cannot access the object (403) + """ + import boto3 + import botocore + import re + + # Get bucket and key from s3 uri + parsed = re.match(r's3:\/\/(.+?)\/(.+)', uri) + bucket, key = parsed.groups() + accessible = True + + try: + # Try to access object in s3 bucket + boto3.resource("s3").Object(bucket, key).load() + except botocore.exceptions.ClientError as e: + # User cannot access object in S3 bucket with the credentials + # stored in "~/.aws/credentials" or "~/.boto". + if e.response["Error"]["Code"] == "403": + accessible = False + + return accessible + + +def abstract_location(file_address, *args, **kwargs): + """ + Determines if a provided file or list of file(s) resides in a remote location. + If file(s) are determined to reside in remote store, like a S3 or Google Cloud + Storage, Snakemake's remote wrapper is used to defined remote files. + This can be extended further to support more file types listed here. + https://snakemake.readthedocs.io/en/stable/snakefiles/remote_files.html + Supported remotes file options include: s3, gs, and sftp + Input: File path or a list or file paths list[] + Output: List of files or remote objects + """ + + # Check if user provided any input + if not file_address or file_address is None: + raise IOError("Failed to provide any input files! Input(s) are required to resolve required files.".format(file_address)) + + # If given file path to one file, convert it a list[] + file_list = [file_address] if isinstance(file_address, str) else file_address + + # Loop through list of provided files, and if a remote storage option + # is given, convert its index to a remote file object. + for i, uri in enumerate(file_list): + if uri.lower().startswith('s3://'): + # Remote option for S3 storage + import snakemake.remote.S3 + import botocore.session + + if botocore.session.get_session().get_credentials() and s3_configured(uri): + # AWS cli or boto has been configured on target system + # See ~/.aws/credentials or ~/.boto + # https://boto.readthedocs.io/en/latest/boto_config_tut.html + remote_provider = snakemake.remote.S3.RemoteProvider() + else: + # If botocore cannot find credentials, try connecting unsigned. + # This will work for anonymous S3 resources if the resources in the + # s3 bucket are configured correctly. + # If a file in provieded as input to a Snakemake rule, only read + # access is needed to access the remote S3 object. + remote_provider = snakemake.remote.S3.RemoteProvider(config=botocore.client.Config(signature_version=botocore.UNSIGNED)) + file_list[i] = remote_provider.remote(uri, *args, **kwargs) + + elif uri.lower().startswith('gs://'): + # Remote option for Google Cloud Storage + import snakemake.remote.GS + remote_provider = snakemake.remote.GS.RemoteProvider() + file_list[i] = remote_provider.remote(uri, *args, **kwargs) + + elif uri.lower().startswith('sftp://'): + # Remote option for SFTP transfers + import snakemake.remote.SFTP + remote_provider = snakemake.remote.SFTP.RemoteProvider() + file_list[i] = remote_provider.remote(uri, *args, **kwargs) + + return file_list + + +def references(config, reflist): + """ + Checks if a set of required reference files were provided. Some rules depend + on a set of required reference files that may only exist for specific reference + genomes. An example of this would be blasklists arriba. The blacklist are manually + curated and only exist for a few reference genomes (mm10, hg38, hg19). + If one of the required reference files does not exist, then it will return + an empty list. + """ + + _all = True + for ref in reflist: + try: tmp = config['references'][ref] + # Check if ref exists in config + except KeyError: + _all = False + break + # Check if ref is empty key string + if not tmp: _all = False + + return _all + + +def allocated(resource, rule, lookup, default="__default__"): + """Pulls resource information for a given rule. If a rule does not have any information + for a given resource type, then it will pull from the default. Information is pulled from + definitions in the cluster.json (which is used a job submission). This ensures that any + resources used at runtime mirror the resources that were allocated. + :param resource : resource type to look in cluster.json (i.e. threads, mem, time, gres) + :param rule : rule to lookup its information + :param lookup : Lookup containing allocation information (i.e. cluster.json) + :param default : default information to use if rule information cannot be found + :return allocation : + allocation information for a given resource type for a given rule + """ + + try: + # Try to get allocation information + # for a given rule + allocation = lookup[rule][resource] + except KeyError: + # Use default allocation information + allocation = lookup[default][resource] + + return allocation + + +def str_bool(s): + """Converts a string to boolean. It is dangerous to try to + typecast a string into a boolean value using the built-in + `bool()` function. This function avoids any issues that can + arise when using `bool()`. + Example: + boolean('True') returns True + boolean('False') returns False + boolean('asdas') raises TypeError + """ + val = s.lower() + if val in ['true', '1', 'y', 'yes']: + return True + elif val in ['false', '0', 'n', 'no', '']: + return False + else: + # Provided value could not be + # type casted into a boolean + raise TypeError('Fatal: cannot type cast {} into a boolean'.format(val)) + + +def joint_option(prefix, valueslist): + """Joins a list while adding a common prefix. + Example: + joint_option('-i', [1,2,3]) + '-i 1 -i 2 -i 3' + """ + s = "" + for v in valueslist: + s += "{} {} ".format(prefix, v) + s = s.rstrip() + return s \ No newline at end of file diff --git a/workflow/scripts/jobby b/workflow/scripts/jobby new file mode 100755 index 0000000..60b7836 --- /dev/null +++ b/workflow/scripts/jobby @@ -0,0 +1,736 @@ +#!/usr/bin/env python3 +# -*- coding: UTF-8 -*- + +""" +ABOUT: + `jobby` will take your past jobs and display their job information. + Why? We have pipelines running on several different clusters and + job schedulers. `jobby` is an attempt to centralize and abstract + the process of querying different job schedulers. On each supported + target system, `jobby` will attempt to determine the best method for + getting job information to return to the user in a standardized + format and unified cli. + +REQUIRES: + - python>=3.5 + +DISCLAIMER: + PUBLIC DOMAIN NOTICE + NIAID Collaborative Bioinformatics Resource (NCBR) + + National Institute of Allergy and Infectious Diseases (NIAID) + This software/database is a "United States Government Work" under + the terms of the United States Copyright Act. It was written as + part of the author's official duties as a United States Government + employee and thus cannot be copyrighted. This software is freely + available to the public for use. + + Although all reasonable efforts have been taken to ensure the + accuracy and reliability of the software and data, NCBR do not and + cannot warrant the performance or results that may be obtained by + using this software or data. NCBR and NIH disclaim all warranties, + express or implied, including warranties of performance, + merchantability or fitness for any particular purpose. + + Please cite the author and NIH resources like the "Biowulf Cluster" + in any work or product based on this material. + +USAGE: + $ jobby [OPTIONS] JOB_ID [JOB_ID ...] + +EXAMPLE: + $ jobby 18627545 15627516 58627597 +""" + +# Python standard library +from __future__ import print_function, division +import sys, os, subprocess, math, re +from subprocess import PIPE +import argparse # added in python/3.5 +import textwrap # added in python/3.5 +import tempfile # added in python/3.5 + +# Jobby metadata +__version__ = 'v0.2.1' +__authors__ = 'Skyler Kuhn' +__email__ = 'skyler.kuhn@nih.gov' +__home__ = os.path.dirname(os.path.abspath(__file__)) +_name = os.path.basename(sys.argv[0]) +_description = 'Will take your job(s)... and display their information!' + + +# Classes +class Colors(): + """Class encoding for ANSI escape sequeces for styling terminal text. + Any string that is formatting with these styles must be terminated with + the escape sequence, i.e. `Colors.end`. + """ + # Escape sequence + end = '\33[0m' + # Formatting options + bold = '\33[1m' + italic = '\33[3m' + url = '\33[4m' + blink = '\33[5m' + higlighted = '\33[7m' + # Text Colors + black = '\33[30m' + red = '\33[31m' + green = '\33[32m' + yellow = '\33[33m' + blue = '\33[34m' + pink = '\33[35m' + cyan = '\33[96m' + white = '\33[37m' + # Background fill colors + bg_black = '\33[40m' + bg_red = '\33[41m' + bg_green = '\33[42m' + bg_yellow = '\33[43m' + bg_blue = '\33[44m' + bg_pink = '\33[45m' + bg_cyan = '\33[46m' + bg_white = '\33[47m' + + +# Helper Functions +def which(cmd, path=None): + """Checks if an executable is in $PATH + @param cmd : + Name of executable to check + @param path : + Optional list of PATHs to check [default: $PATH] + @return : + True if exe in PATH, False if not in PATH + """ + if path is None: + path = os.environ["PATH"].split(os.pathsep) + + for prefix in path: + filename = os.path.join(prefix, cmd) + executable = os.access(filename, os.X_OK) + is_not_directory = os.path.isfile(filename) + if executable and is_not_directory: + return True + + return False + + +def err(*message, **kwargs): + """Prints any provided args to standard error. + kwargs can be provided to modify print functions + behavior. + @param message : + Values printed to standard error + @params kwargs + Key words to modify print function behavior + """ + print(*message, file=sys.stderr, **kwargs) + + + +def fatal(*message, **kwargs): + """Prints any provided args to standard error + and exits with an exit code of 1. + @param message : + Values printed to standard error + @params kwargs + Key words to modify print function behavior + """ + err(*message, **kwargs) + sys.exit(1) + + +def get_toolkit(tool_list): + """Finds the best suited tool from a list of + possible choices. Assumes tool list is already + ordered from the best to worst choice. The first + tool found in a user's $PATH is returned. + @param tool_list list[]: + List of ordered tools to find + @returns best_choice : + First tool found in tool_list + """ + best_choice = None + for exe in tool_list: + if which(exe): + best_choice = exe + break + + # Did not find any tools + # to potentially use + if not best_choice: + err( + 'Error: Did not find any tools to get job information!' + ) + fatal( + 'Expected one of the following tools to be in $PATH:' + '\t{0}'.format(tool_list) + ) + + return best_choice + + +def add_missing(linelist, insertion_dict): + """Adds missing information to a list. This can be used + to add missing job information fields to the results of + job querying tool. + @param linelist list[]: + List containing job information for each field of interest + @param insertion_dict dict[] = str + Dictionary used to insert missing information to a given + index, where the keys are indices of the `linelist` and the + values are information to add. Please note that the indices + should be zero based. Note that multiple consequetive values + should be inserted at once as a list, see example below: + Example: + add_missing([0,1,2,3,4], {3:['+','++'], 1:'-', 4:'@'}) + >> [0, '-', 1, 2, '+', '++', 3, '@', 4] + """ + # Get the order of indices + # add missing information + # starting from largest to + # smallest, if we insert + # missing values in this + # order we do not need to + # calculate the offset of + # new indices + tmp_list = linelist + indices = sorted(list(insertion_dict.keys()), reverse=True) + for i in indices: + # Check if multiple values + # need to be inserted at a + # given index + if isinstance(insertion_dict[i], list): + for v in reversed(insertion_dict[i]): + tmp_list.insert(i, v) + else: + tmp_list.insert(i, insertion_dict[i]) + return tmp_list + + +def convert_size(size_bytes): + """Converts bytes to a human readable format. + """ + # Sizes range from B to YiB, + # warning larger sizes storage + # may results in blackhole + size_name = ( + "B", "KiB", "MiB", + "GiB", "TiB", "PiB", + "EiB", "ZiB", "YiB" + ) + if size_bytes == 0: + return "0B" + i = int(math.floor(math.log(size_bytes, 1024))) + p = math.pow(1024, i) + s = round(size_bytes / p, 2) + return "{0}{1}".format(s, size_name[i]) + + +def to_bytes(size): + """Convert a human readable size unit into bytes. + Returns None if cannot convert/parse provided size.""" + size2bytes = { + "b":1, "bytes":1, "byte":1, + "k":1024, "kib":1024, "kb":1000, + "m": 1024**2, "mib": 1024**2, "mb": 1000**2, + "g": 1024**3, "gib": 1024**3, "gb": 1000**3, + "t": 1024**4, "tib": 1024**4, "tb": 1000**4, + "p": 1024**5, "pib": 1024**5, "pb": 1000**5, + "e": 1024**6, "eib": 1024**6, "eb": 1000**6, + "z": 1024**7, "zib": 1024**7, "zb": 1000**7, + "y": 1024**8, "yib": 1024**8, "yb": 1000**8 + } + + size = size.replace(' ','') + match = re.search('(?P[0-9.]+)(?P[a-zA-Z]+)$', size) + + if match: + human_units = match.group('units').lower() + human_units = human_units.lstrip().rstrip() + scaling_factor = size2bytes[human_units] + bytes = int(math.ceil(scaling_factor * float(match.group('size')))) + else: + # Cannot parse units, + # cannot convert value + # into bytes + return None + + return bytes + + + +# Core logic for getting +# job information +def sge(jobs, threads, tmp_dir): + """Displays SGE job information to standard output. + @param sub_args : + Parsed command-line arguments + @return None + """ + # NOTE: add later for SGE cluster + pass + + +def uge(jobs, threads, tmp_dir): + """Displays UGE job information to standard output. + @param sub_args : + Parsed command-line arguments + @return None + """ + # NOTE: add later for LOCUS cluster + pass + + +def dashboard_cli(jobs, threads=1, tmp_dir=None): + """Biowulf-specific tool to get SLURM job information. + HPC staff recommend using this over the default slurm + `sacct` command for performance reasons. By default, + the `dashboard_cli` returns information for the following + fields: + jobid state submit_time partition nodes + cpus mem timelimit gres dependency + queued_time state_reason start_time elapsed_time end_time + cpu_max mem_max eval + Runs command: + $ dashboard_cli jobs \\ + --joblist 12345679,12345680 \\ + --fields FIELD,FIELD,FIELD \\ + --tab --archive + """ + fields = [ + "jobid","jobname", + "state","partition", + "gres","cpus","mem", + "cpu_max","mem_max", + "timelimit","queued_time", + "start_time","end_time", + "elapsed_time","nodelist", + "user", "std_out", "std_err", + "work_dir" + ] + + # Display header information, + # --tab option does not print + # the header + print('\t'.join(fields)) + # Display job information + cmd = subprocess.run( + 'dashboard_cli jobs --archive --tab --joblist {0} --fields {1}'.format( + ','.join(jobs), + ','.join(fields) + ), + stdout=PIPE, + stderr=PIPE, + universal_newlines=True, + shell=True + ) + + # Check for failure + # of the last command + if cmd.returncode != 0: + err("\nError: Failed to get job information with 'dashboard_cli'!") + err('Please see error message below:') + fatal(' └── ', cmd.stderr) + + print(cmd.stdout.rstrip('\n')) + + +def get_slurm_version(): + """Gets the version of SLURM in the user's $PATH. + Runs `sacct -V` to get version information, and + returns a list containing ['Major','Minor','Patch'] + information. + @return sematic_version list[] + List containing Major, Minor, and Patch version. + """ + # Get SLURM's sematic version + cmd = subprocess.run( + 'sacct -V', + stdout=PIPE, + stderr=PIPE, + universal_newlines=True, + shell=True + ) # slurm 21.08.8-2 + sematic_version = cmd.stdout.rstrip('\n').split(' ')[-1] # 21.08.8-2 + sematic_version = sematic_version.split('-')[0].split('.') # ['21', '08', '8'] + sematic_version = [int(v) for v in sematic_version] # [21, 8, 8] + + return sematic_version + + +def sacct(jobs, threads=1, tmp_dir=None): + """Generic tool to get SLURM job information. + `sacct` should be available on all SLURM clusters. + The `dashboard_cli` is prioritized over using `sacct` + due to perform reasons; however, this method will be + portable across different SLURM clusters. To get maximum + memory usage for a job, we will need to parse the MaxRSS + field from the `$SLURM_JOBID.batch` lines. + Returns job information for the following fields: + jobid jobname state partition reqtres + alloccpus reqmem maxrss timelimit reserved + start end elapsed nodelist user + workdir + To get maximum memory usage for a job, we will need to parse + the MaxRSS fields from the `$SLURM_JOBID.batch` lines. + Runs command: + $ sacct -j 12345679,12345680 \\ + --fields FIELD,FIELD,FIELD \\ + -P --delimiter $'\t' + """ + header = [ + "jobid","jobname","state","partition", + "gres","cpus","mem","cpu_max","mem_max", + "timelimit","queued_time","start_time", + "end_time","elapsed_time","nodelist", + "user","std_out","std_err", "work_dir" + ] + fields = [ + "jobid", "jobname", + "state", "partition", + "reqtres", "alloccpus", + "reqmem", "maxrss", + "timelimit", "reserved", + "start", "end", + "elapsed", "nodelist", + "user", "workdir" + ] + + # Get sematic version of SLURM + major, minor, patch = get_slurm_version() + if major >= 23: + # Output field changed in 23.X, + # "Reserved" changed to "Planned" + fields = [ + "jobid", "jobname", + "state", "partition", + "reqtres", "alloccpus", + "reqmem", "maxrss", + "timelimit", "planned", + "start", "end", + "elapsed", "nodelist", + "user", "workdir" + ] + + # Missing std_out and std_err + missing_fields = {15:['-','-']} + # Display header information, + print('\t'.join(header)) + # Display job information + cmd = subprocess.run( + "sacct -j {0} -P --delimiter $'\\t' --format={1}".format( + ','.join(jobs), + ','.join(fields) + ), + stdout=PIPE, + stderr=PIPE, + universal_newlines=True, + shell=True + ) + + # Check for failure + # of the last command + if cmd.returncode != 0: + err("\nError: Failed to get job information with 'sacct'!") + err('Please see error message below:') + fatal(' └── ', cmd.stderr) + + # Get max memory information, + # Stored as $SLURM_JOBID.batch + # in the MaxRSS field + j2m = {} + # Remove trailing newline from + # standard output and split lines + # on remaining newline characters + job_information = cmd.stdout.rstrip('\n').split('\n') + for i, line in enumerate(job_information): + if i < 1: + # skip over header + continue + linelist = line.lstrip().rstrip().split('\t') + if linelist[0].endswith('.batch'): + jobid = linelist[0].strip().split('.')[0] + maxmem = linelist[7].replace(' ', '') + mem_bytes = to_bytes(maxmem) + if not mem_bytes: + # Could not convert + # max_mem value into + # bytes + j2m[jobid] = '-' + continue # goto next line + + human_readable_mem = convert_size(mem_bytes) + j2m[jobid] = human_readable_mem + + # Display the results + for i, line in enumerate(job_information): + if i < 1: + # skip over header + continue + linelist = line.lstrip().rstrip().split('\t') + jobid = linelist[0].strip() + if '.' not in jobid: + try: + max_mem = j2m[jobid] + except KeyError: + # Job maybe still be + # running or in a non- + # completed state. + max_mem = '-' + status = linelist[2].split(' ')[0] + linelist[2] = status + missing_fields[8] = max_mem + linelist = add_missing(linelist, missing_fields) + linelist = [info if info else '-' for info in linelist] + print('\t'.join(linelist)) + + +def slurm(jobs, threads, tmp_dir): + """Displays SLURM job information to standard output. + @param sub_args : + Parsed command-line arguments + @return None + """ + # Try to use the following tools in this + # order to get job information! + # [1] `dashboard_cli` is Biowulf-specific + # [2] `sacct` should always be there + tool_priority = ['dashboard_cli', 'sacct'] + job_tool = get_toolkit(tool_priority) + # Get information about each job + # must use eval() to make string + # to callable function + eval(job_tool)(jobs=jobs, threads=threads, tmp_dir=tmp_dir) + + +def jobby(args): + """ + Wrapper to each supported job scheduler: slurm, etc. + Each scheduler has a custom handler to most effectively + get and parse job information. + @param sub_args : + Parsed command-line arguments + @return None + """ + # Get command line options + abstract_handler = None + job_ids = args.JOB_ID + scheduler = args.scheduler + threads = args.threads + tmp_dir = args.tmp_dir + + # Set handler for each + # supported scheduler + if scheduler == 'slurm': + abstract_handler = slurm + else: + # Unsupported job scheduler, + # needs to be implemented + fatal( + 'Error: "{0}" is an unsupported job scheduler!'.format(scheduler) + ) + + # Display job(s) information + # to standard output + abstract_handler( + jobs=job_ids, + threads=threads, + tmp_dir=tmp_dir + ) + + +# Parse command-line arguments +def parsed_arguments(name, description): + """Parses user-provided command-line arguments. This requires + argparse and textwrap packages. To create custom help formatting + a text wrapped docstring is used to create the help message for + required options. As so, the help message for require options + must be suppressed. If a new required argument is added to the + cli, it must be updated in the usage statement docstring below. + @param name : + Name of the pipeline or command-line tool + @param description : + Short description of pipeline or command-line tool + """ + # Add styled name and description + c = Colors + styled_name = "{0}{1}{2}{3}{4}".format( + c.bold, c.bg_black, + c.cyan, name, c.end + ) + description = "{0}{1}{2}".format(c.bold, description, c.end) + temp = tempfile.gettempdir() + + # Please note: update the usage statement + # below if a new option is added! + usage_statement = textwrap.dedent("""\ + {0}: {1} + + {3}{4}Synopsis:{5} + $ {2} [--version] [--help] \\ + [--scheduler {{slurm | ...}}] \\ + [--threads THREADS] [--tmp-dir TMP_DIR] \\ + + + {3}{4}Description:{5} + {2} will take your past jobs and display their job information + in a standardized format. Why???! We have pipelines running on several + different clusters (using different job schedulers). {2} centralizes + and abstracts the process of querying different job schedulers within + a unified command-line interface. + + For each supported scheduler, jobby will determine the best method + on a given target system for getting job information to return to the + user in a common output format. + + {3}{4}Required Positional Arguments:{5} + + Identiers of past jobs. One or more JOB_IDs + can be provided. Multiple JOB_IDs should be + seperated by a space. Information for each + of the JOB_IDs will be displayed to standard + output. Please see example section below for + more information. + + {3}{4}Options:{5} + -s,--scheduler {{slurm | ...}} + @Default: slurm + Job scheduler. Defines the job scheduler + of the target system. Additional support + for more schedulers coming soon! + @Example: --scheduler slurm + -n, --threads THREADS + @Default: 1 + Number of threads to query the scheduler + in parallel. + @Example: --threads: 8 + -t, --tmp-dir TMP_DIR + @Default: {7}/ + Temporary directory. Path on the filesystem + for writing temporary output files. Ideally, + this path should point to a dedicated space + on the filesystem for writing tmp files. If + you need to inject a variable into this path + that should NOT be expanded, please quote the + options value in single quotes. The default + location of this option is set to the system + default via the $TMPDIR environment variable. + @Example: --tmp-dir '/scratch/$USER/' + + -h, --help Shows help and usage information and exits. + @Example: --help + + -v, --version Displays version information and exits. + @Example: --version + """.format(styled_name, description, name, c.bold, c.url, c.end, c.italic, temp)) + + # Display example usage in epilog + run_epilog = textwrap.dedent("""\ + {2}{3}Example:{4} + # Please avoid running jobby + # on a cluster's head node! + ./jobby -s slurm -n 4 18627542 13627516 58627597 48627666 + + {2}{3}Version:{4} + {1} + """.format(name, __version__, c.bold, c.url, c.end)) + + # Create a top-level parser + parser = argparse.ArgumentParser( + usage = argparse.SUPPRESS, + formatter_class=argparse.RawDescriptionHelpFormatter, + description = usage_statement, + epilog = run_epilog, + add_help=False + ) + + # Required Positional Arguments + # List of JOB_IDs, 1 ... N_JOB_IDS + parser.add_argument( + 'JOB_ID', + nargs = '+', + help = argparse.SUPPRESS + ) + + # Options + # Adding verison information + parser.add_argument( + '-v', '--version', + action = 'version', + version = '%(prog)s {}'.format(__version__), + help = argparse.SUPPRESS + ) + + # Add custom help message + parser.add_argument( + '-h', '--help', + action='help', + help=argparse.SUPPRESS + ) + + # Base directory to write + # temporary/intermediate files + parser.add_argument( + '-t', '--tmp-dir', + type = str, + required = False, + default = temp, + help = argparse.SUPPRESS + ) + + # Number of threads for the + # pipeline's main proceess + # This is only applicable for + # local rules or when running + # in local mode. + parser.add_argument( + '-n', '--threads', + type = int, + required = False, + default = 1, + help = argparse.SUPPRESS + ) + + # Job scheduler to query, + # available: SLURM, ... + # More coming soon! + parser.add_argument( + '-s', '--scheduler', + type = lambda s: str(s).lower(), + required = False, + default = "slurm", + choices = ['slurm'], + help = argparse.SUPPRESS + ) + + # Define handlers for each sub-parser + parser.set_defaults(func = jobby) + # Parse command-line args + args = parser.parse_args() + + return args + + +def main(): + # Sanity check for usage + if len(sys.argv) == 1: + # Nothing was provided + fatal('Invalid usage: {} [-h] [--version] ...'.format(_name)) + + # Collect args for sub-command + args = parsed_arguments( + name = _name, + description = _description + ) + + # Display version information + err('{} ({})'.format(_name, __version__)) + # Mediator method to call the + # default handler function + args.func(args) + + +if __name__ == '__main__': + main() diff --git a/workflow/scripts/utils.sh b/workflow/scripts/utils.sh new file mode 100644 index 0000000..f4a8334 --- /dev/null +++ b/workflow/scripts/utils.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash + +# Functions +function err() { cat <<< "$@" 1>&2; } +function fatal() { err "$@"; exit 1; } +function abspath() { readlink -e "$1"; } + +function retry() { + # Tries to run a cmd 5 times before failing + # If a command is successful, it will break out of attempt loop + # Failed attempts are padding with the following exponential + # back-off strategy {4, 16, 64, 256, 1024} in seconds + # @INPUTS "$@"" = cmd to run + # @CALLS fatal() if command cannot be run in 5 attempts + local n=1 + local max=5 + local attempt=true # flag for while loop + while $attempt; do + # Attempt command and break if successful + "$@" && attempt=false || { + # Try again up to 5 times + if [[ $n -le $max ]]; then + err "Command failed: $@" + delay=$(( 4**$n )) + err "Attempt: ${n}/${max}. Trying again in ${delay} seconds!" + sleep $delay; + ((n++)) + else + fatal "Fatal: the command has failed after max attempts!" + fi + } + done +} + + +export -f err +export -f fatal +export -f retry