diff --git a/scripts/pipeline/README.md b/scripts/pipeline/README.md new file mode 100644 index 00000000..01d6574c --- /dev/null +++ b/scripts/pipeline/README.md @@ -0,0 +1,38 @@ +# Pipeline for training and testing + +These are a series of scripts for running training and benchmarking. + +## Why are they so complicated? + +They are designed for a cluster system which kills jobs after 24 hours, so it makes it difficult to process the results of a job or start a new job if the previous one didn't finish. Right now the code manually waits and checks until a job finishes (but maybe it will be modified with support for automatically starting another job after one finishes using built in support for this in the cluster system). + +## How do I use this system? +(These steps are subject to improvement.) + +### Prerequistes +- A fairly new version of python with yaml installed. +- Conda/Miniconda +- Maybe a few more things I don't realize + +### Steps to run +- Create a directory of the form `YYYY-MM-DD_...`, e.g. `2023-02-23_A_train_model`, and paste in the scripts directory into it, e.g. `2023-02-23_A_train_model/scripts`. (TODO: Make it runnable from another directory.) +- Put in a parameter file labeled `params.yaml` into your directory. (More on this below.) +- From within `scripts` run `sh run.sh` for short jobs or jobs which are run inside a job system or `sh tmux_run.sh` for long jobs where the outer job executer is running in `tmux`. + +### What is happening when I run this? +Depending on your `params.yaml` it runs training or benchmarking. (Support for automatic benchmarking is still to be added.) The scripts add the following subdirectories: +- logs: This is where all the logs for the various jobs are stored. +- results: This stores the results of the run including trained model weights and tensorboard files +- params: This is for communication between jobs. +- workdir: This is all the stuff which can be deleted after all the jobs end like conda environments, downloaded repos, etc. + +For training, it starts by installing a fresh conda environment and running training from that inside a job/tmux depending on your settings. After the training finishes or the job is killed, it uploads the results and continues training if needed. Further plans also include better summarization of the benchmark results. + +For benchmarking, it starts by installing a fresh conda environment and opam switch and running benchmarking from that environment/switch inside a job/tmx depending on your settings. Currently the benchmark jobs run sequentially. Further plans include running all the benchmarks in parallel, and supporting long benchmarks which take over 24 hours (where the job will get killed in the cluster this is designed for). Further plans also include better summarization of the benchmark results. + +## Parameter files + +TODO: Include examples of a training parameter file and a benchmarking parameter file. + +## Adding support for another cluster like SLURM +If you have no time limit on jobs, currently the scripts run sequentially and it is easy to just run from within a job using the `tmux` settings in the parameter files (as in the examples). But it is also possible to add in your own job system fairly easily to the scripts. \ No newline at end of file diff --git a/scripts/pipeline/scripts/1_initialize_script.py b/scripts/pipeline/scripts/1_initialize_script.py new file mode 100644 index 00000000..2c96899f --- /dev/null +++ b/scripts/pipeline/scripts/1_initialize_script.py @@ -0,0 +1,20 @@ +import os +from pathlib import Path +import yaml + +def setup_run_directory(): + print("Creating directories...") + Path("../logs").mkdir(exist_ok=True) + Path("../params").mkdir(exist_ok=True) + Path("../workdir").mkdir(exist_ok=True) + Path("../results").mkdir(exist_ok=True) + +def main(): + with Path("../params.yaml").open() as f: + params = yaml.safe_load(f) + + # set up the directories + setup_run_directory() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/pipeline/scripts/2_train_script.py b/scripts/pipeline/scripts/2_train_script.py new file mode 100644 index 00000000..41959580 --- /dev/null +++ b/scripts/pipeline/scripts/2_train_script.py @@ -0,0 +1,214 @@ +""" +Script governing all training runs + +Compute intensive code to be run on a compute node +""" +import datetime +import os +from pathlib import Path +import shutil +import urllib +import yaml + +import build_model_spec +from job_runner import JobRunner +from utils import Utils + + +class TrainPipeline: + run_dir_name: str + params: dict + dirs: dict[str, Path] + conda_env_path: Path + + def __init__(self, run_name: str, params: dict, outer_directories: dict[str, Path]): + self.run_dir_name = run_name + self.params = params + self.dirs = outer_directories.copy() + + def setup_train_dirs(self): + print() + print("========") + print("Setting up directories for training") + print("========") + print() + + # specialize some directories for training + for dir in ["params", "results", "workdir"]: + self.dirs[dir] = self.dirs[dir] / "train" + print(f"Make {self.dirs[dir]}") + self.dirs[dir].mkdir(exist_ok=True) + + print(f"Setting {self.dirs['workdir']} as workdir") + os.chdir(self.dirs["workdir"]) + + def setup_conda_env(self): + self.conda_env_path = self.dirs["workdir"] / "venv" + Utils.setup_conda(self.conda_env_path, self.params["conda_env"]) + + def extract_model_checkpoints(self) -> list[tuple[int, Path]]: + """Retrieve all available model checkpoint directories, + possibly moving files to results directory if needed. + + :return: List of all available checkpoints as (epoch, directory path) pairs + """ + model_type = self.params["model"] + + if model_type == "hmodel": + # There would be at most one hmodel results. + # It will have already been moved to this directory. + model_dir = self.dirs["results"] / "hmodel" + if model_dir.exists(): + return [(0, model_dir)] + else: + return [] + + elif model_type == "tf2": + weights_dir = self.dirs["results"] / "weights" + checkpoints = [ + (int(cp.name.split("epoch")[1]), cp) + for cp in weights_dir.iterdir() + if cp.is_dir() and cp.name.startswith("checkpoint__epoch") + ] + return checkpoints + + elif model_type == "tfgnn": + # the results directory is the whole checkpoint, + # so for now just return the max checkpoint + weights_dir = self.dirs["results"] / "ckpt" + checkpoints = [ + int(cp.name.split(".")[0].split("ckpt-")[1]) + for cp in weights_dir.iterdir() + if cp.name.startswith("ckpt-") + ] + if checkpoints: + return [(max(checkpoints), self.dirs["results"])] + + raise Exception(f"Not reconized model type: ", model_type) + + def upload_model(self, model_dir, epoch): + # upload the model to the fileserver via rsync + fileserver_ssh = Path(self.params["upload_model"]["fileserver_ssh"]) + tar_file_name = Path(f"{self.run_dir_name}_epoch{epoch}.tar.gz") + os.system(f"tar -czvf {tar_file_name} -C {model_dir.parent} {model_dir.name}") + md5sum = os.popen(f"md5sum {tar_file_name}").read().split()[0] + new_tar_file_name = Path(f"{self.run_dir_name}_epoch{epoch}.{md5sum}.tar.gz") + os.system(f"rsync {tar_file_name} {fileserver_ssh / new_tar_file_name}") + + # use the partial spec to make a full spec + if self.params["upload_model"]["fileserver_url"]: + fileserver_url = self.params["upload_model"]["fileserver_url"] + spec_params = self.params["upload_model"]["spec"].copy() + spec_params["model_source_url"] = urllib.parse.urljoin(fileserver_url, str(new_tar_file_name)) + spec_params["model_md5sum"] = md5sum + spec_params["description"] = f"{self.run_dir_name}_epoch{epoch}" + + # TODO check the right architecture is used in the server + + git_branch = self.params["upload_model"]["spec_git_branch"] + git_commit = build_model_spec.create_and_push_spec(git_branch=git_branch, params=spec_params) + + return git_commit + else: + return None + + def train_loop(self): + # save model params + model_params = self.params["model_params"] + model_params_file = self.dirs["params"] / "all_model_params.yaml" + with model_params_file.open(mode="w") as f: + yaml.dump(model_params, f) + + dataset_dir = Path(self.params["data"]["datadir"]) / self.params["data"]["dataset"] + prev_epoch = -1 + for round in range(self.params["max_restarts"] + 1): + print() + print("========") + print("Run job to train model") + print("========") + print() + cmd = [ + "python", "-u", + self.dirs["scripts"] / "train_script.py", + "--model-arch", self.params["model"], + "--model-params", model_params_file, + "--round", round, + "--conda_env_path", self.conda_env_path, + "--paramsdir", self.dirs["params"], + "--datadir", dataset_dir, + "--resultsdir", self.dirs["results"], + "--workdir", self.dirs["workdir"], + ] + job_runner = JobRunner( + job_params=self.params["job"], + log_dir=self.dirs["logs"], + job_name="train" + ) + job_runner.run_cmd_and_block(cmd) + + checkpoints = self.extract_model_checkpoints() + + if not checkpoints: + print("Didn't generate any checkpoints. Stopping run.") + raise Exception("Didn't generate any checkpoints.") + + print() + print("========") + print("Processing result") + print("========") + print() + + epoch, checkpoint_dir = max(checkpoints) + if epoch > prev_epoch: + prev_epoch = epoch + spec_commit = self.upload_model(checkpoint_dir, epoch=epoch) + + # TODO: Create automatic benchmark config + # TODO: Benchmark checkpoint + + else: + print(f"No new checkpoint created!") + return # TODO: There is a better way to check this is done. + + + + + def clean_up(self): + print() + print("========") + print("Clean up") + print("========") + print() + + print(f"Remove {self.dirs['workdir']}") + shutil.rmtree(self.dirs['workdir']) + +def train( + run_name: str, + train_params: dict, + directories: dict[str, Path] +): + train_pipeline = TrainPipeline( + run_name=run_name, + params=train_params, + outer_directories=directories + ) + train_pipeline.setup_train_dirs() + train_pipeline.setup_conda_env() + train_pipeline.train_loop() + train_pipeline.clean_up() + +def main(): + params = Utils.get_params() + if "train" not in params: + print("No train parameters. No training to do.") + return + + train( + run_name=Utils.get_run_dir_name(), + train_params=params["train"], + directories=Utils.get_directories() + ) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/pipeline/scripts/3_benchmark_script.py b/scripts/pipeline/scripts/3_benchmark_script.py new file mode 100644 index 00000000..b15e1140 --- /dev/null +++ b/scripts/pipeline/scripts/3_benchmark_script.py @@ -0,0 +1,220 @@ +import os +from pathlib import Path +import yaml +import shutil +import datetime + +from build_model_spec import create_and_push_spec +from job_runner import JobRunner +from utils import Utils + + +class BenchmarkPipeline: + """ + Controls the overall benchmark process + """ + run_dir_name: str + params: dict + dirs: dict[str, Path] + conda_env_path: Path + opam_switch_path: Path + benchmark_system: Path + + def __init__(self, run_name: str, params: dict, outer_directories: dict[str, Path]): + self.run_dir_name = run_name + self.params = params + self.dirs = outer_directories.copy() + + def setup_benchmark_dirs(self): + print() + print("========") + print("Setting up directories for benchmarking") + print("========") + print() + + # specialize some directories for benchmarking + for dir in ["params", "results", "workdir"]: + self.dirs[dir] = self.dirs[dir] / "benchmark" + print(f"Make {self.dirs[dir]}") + self.dirs[dir].mkdir(exist_ok=True) + + print(f"Setting {self.dirs['workdir']} as workdir") + os.chdir(self.dirs["workdir"]) + + def setup_conda_opam_env(self): + # conda + self.conda_env_path = self.dirs["workdir"] / "venv" + Utils.setup_conda(self.conda_env_path, self.params["conda_env"]) + conda_run_prefix = f"conda run -p {self.conda_env_path} --live-stream" + + # benchmark_system + self.benchmark_system = self.dirs["workdir"] / "benchmark-system" + os.system(f"git clone git@github.com:coq-tactician/benchmark-system.git {self.benchmark_system}") + + # opam + self.opam_switch_path = self.dirs["workdir"] # not created yet + # just hard-code these for now + opam_env_params = { + "setup_cmds": ["opam update", f"opam install --yes {self.benchmark_system}"] + } + Utils.setup_opam(conda_run_prefix, self.opam_switch_path, opam_env_params) + + def clean_up(self): + pass + + +class SingleBenchmarkPipeline: + """ + Controls a single benchmark + """ + run_name: str + benchmark_id: str + params: dict + conda_env_path: Path + opam_switch_path: Path + benchmark_system: Path + + def __init__(self, run_name: str, ix: int, params: dict, outer_directories: dict, conda_env_path: Path, opam_switch_path: Path, benchmark_system: Path): + self.run_name=run_name + self.ix=ix + self.params=params.copy() + self.dirs=outer_directories.copy() + self.conda_env_path=conda_env_path + self.opam_switch_path=opam_switch_path + self.benchmark_system=benchmark_system + + def setup_benchmark_dirs(self): + print() + print("========") + print("Setting up directories for benchmarking") + print("========") + print() + + # specialize some directories for benchmarking + for dir in ["params", "results", "workdir"]: + self.dirs[dir] = self.dirs[dir] / ("benchmark_" + str(self.ix)) + print(f"Make {self.dirs[dir]}") + self.dirs[dir].mkdir(exist_ok=True) + + print(f"Setting {self.dirs['workdir']} as workdir") + os.chdir(self.dirs["workdir"]) + + def build_and_upload_benchmark_spec(self): + if "spec_repo" in self.params and "spec_commit" in self.params: + print("Spec commit given:") + print(self.params["spec_repo"]) + print() + return + + if "spec_repo" not in self.params: + self.params["spec_repo"] = "git+ssh://git@github.com/coq-tactician/coq-graph2tac-trained" + + #TODO(jrute): Add support for uploading specs to other locations + assert self.params["spec_repo"] == "git+ssh://git@github.com/coq-tactician/coq-graph2tac-trained", ( + "Don't yet have support for creating specs in other benchmark repos" + ) + spec_commit = create_and_push_spec( + git_branch=self.params["spec_branch"], + params=self.params["spec"] + ) + self.params["spec_commit"] = spec_commit + + def clone_data_repo(self): + self.datadir = self.dirs["workdir"] / "benchmark-data" + os.system(f"git clone git@github.com:coq-tactician/benchmark-data.git {self.datadir}") + + def update_params(self): + benchmark_settings = self.params["benchmark_settings"] + benchmark_settings["benchmark-data"] = str(self.datadir) + benchmark_settings["benchmark-repo"] = self.params["spec_repo"] + benchmark_settings["benchmark-commit"] = self.params["spec_commit"] + benchmark_settings["compile-allocator"] = str(self.benchmark_system / "local" / "compile_allocator") + benchmark_settings["bench-allocator"] = str(self.benchmark_system / "local" / "bench_allocator") + + def remove_data_repo(self): + shutil.rmtree(self.datadir) + + def run_benchmark(self): + # save model params + benchmark_params_file = self.dirs["params"] / "benchmark_params.yaml" + with benchmark_params_file.open(mode="w") as f: + yaml.dump(self.params, f) + + print() + print("========") + print("Run job to benchmark model") + print("========") + print() + + # TODO(jrute): Handle if need to restart job + round = 0 + cmd = [ + "python", "-u", + self.dirs["scripts"] / "benchmark_script.py", + "--params", benchmark_params_file, + "--round", round, + "--conda_env_path", self.conda_env_path, + "--opam_switch_path", self.opam_switch_path, + "--datadir", self.datadir, + "--paramsdir", self.dirs["params"], + "--resultsdir", self.dirs["results"], + "--workdir", self.dirs["workdir"], + ] + job_runner = JobRunner( + job_params=self.params["job"], + log_dir=self.dirs["logs"], + job_name="benchmark_" + str(self.ix) + ) + job_runner.run_cmd_and_block(cmd) + + +def benchmark( + run_name: str, + benchmark_params: dict, + directories: dict[str, Path] +): + benchmark_pipeline = BenchmarkPipeline( + run_name=run_name, + params=benchmark_params, + outer_directories=directories + ) + benchmark_pipeline.setup_benchmark_dirs() + benchmark_pipeline.setup_conda_opam_env() + + for ix, single_benchmark_params in enumerate(benchmark_params["benchmarks"]): + #TODO(jrute): Make this NOT single threaded + single_benchmark_pipeline = SingleBenchmarkPipeline( + run_name=run_name, + ix=ix, + params=single_benchmark_params, + outer_directories=benchmark_pipeline.dirs, + conda_env_path=benchmark_pipeline.conda_env_path, + opam_switch_path=benchmark_pipeline.opam_switch_path, + benchmark_system=benchmark_pipeline.benchmark_system + ) + single_benchmark_pipeline.setup_benchmark_dirs() + single_benchmark_pipeline.build_and_upload_benchmark_spec() + single_benchmark_pipeline.clone_data_repo() + single_benchmark_pipeline.update_params() + single_benchmark_pipeline.run_benchmark() + single_benchmark_pipeline.remove_data_repo() + + benchmark_pipeline.clean_up() + +def main(): + with Path("../params.yaml").open() as f: + params = yaml.safe_load(f) + + # set up the directories + if "benchmark" not in params: + print("No benchmark parameters. No benchmarking to do.") + return + + benchmark( + run_name=Utils.get_run_dir_name(), + benchmark_params=params["benchmark"], + directories=Utils.get_directories() + ) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/pipeline/scripts/benchmark_script.py b/scripts/pipeline/scripts/benchmark_script.py new file mode 100644 index 00000000..4e17689e --- /dev/null +++ b/scripts/pipeline/scripts/benchmark_script.py @@ -0,0 +1,142 @@ +""" +Script governing benchmarking + +Compute intensive code to be run on a compute node +""" +import argparse +from dataclasses import dataclass +import os +from pathlib import Path +import shutil +import tempfile +from typing import Any +import yaml + + +@dataclass +class Benchmarker: + round: int + conda_env_path: Path + opam_switch_path: Path + paramsdir: Path + resultsdir: Path + datadir: Path + workdir: Path + params: dict[str, Any] + + def build_script(self) -> str: + benchmark_settings = self.params["benchmark_settings"].copy() + + # to run something in opam, put it in a script starting with `eval $(opam env)`` + script = [] + script.append("opam update") + script.append(f"eval $(opam env --switch={self.opam_switch_path})") + script.append("") + script.append("export PATH=$CONDA_PREFIX/bin:$PATH") + script.append("export CPATH=$CONDA_PREFIX/include:$CPATH") + script.append("") + script.append("tactician-benchmark \\") + for setting, value in self.params["benchmark_settings"].items(): + if setting != "coq-project": + script.append(f"-{setting} {value} \\") + coq_project = benchmark_settings["coq-project"] + script.append(f"{coq_project}") + script.append("") + + return "\n".join(script) + + def run_benchmark(self) -> int: + script = self.build_script() + + print(script) + + # to run an opam command, make a temporary script and then run that + conda_run_prefix = f"conda run -p {self.conda_env_path} --live-stream" + script_sh = self.workdir / "run_benchmark.sh" + script_sh.write_text(script) + exit_code = os.system(f"{conda_run_prefix} sh {script_sh}") + + return exit_code + + def cleanup(self): + #TODO(jrute): Find way to indicate that benchmark finished successfully + #probably deleting benchmark results dir + pass + +def read_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument( + "--params", + type=Path, + help="YAML parameter file for the model" + ) + parser.add_argument( + "--conda_env_path", + type=Path, + help="Location of conda environment" + ) + parser.add_argument( + "--opam_switch_path", + type=Path, + help="Location of opam environment" + ) + parser.add_argument( + "--round", + type=int, + help="Number of restarts (starting at zero)" + ) + parser.add_argument( + "--workdir", + type=Path, + help="Directory to run from (files won't be saved)" + ) + parser.add_argument( + "--datadir", + type=Path, + help="Benchmark data" + ) + parser.add_argument( + "--resultsdir", + type=Path, + help="Location to store trained models and other results" + ) + parser.add_argument( + "--paramsdir", + type=Path, + help="Location to store parameter yaml files and other communication files" + ) + args = parser.parse_args() + + # check inputs + assert args.params.is_file(), f"{args.params} must be an existing file" + assert str(args.params).endswith(".yaml"), f"{args.params} must be a .yaml file" + + return args + +def main(): + args = read_args() + + with Path(args.params).open() as f: + params = yaml.safe_load(f) + + with tempfile.TemporaryDirectory() as tmpdirname: + benchmarker = Benchmarker( + round=args.round, + conda_env_path=args.conda_env_path, + opam_switch_path=args.opam_switch_path, + paramsdir=args.paramsdir, + resultsdir=args.resultsdir, + datadir=args.datadir, + workdir=args.workdir, + params=params + ) + exit_code = benchmarker.run_benchmark() + benchmarker.cleanup() + + if exit_code: + print() + print(f"Did not exit properly. Exit code: {exit_code}") + exit(exit_code) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/scripts/pipeline/scripts/build_model_spec.py b/scripts/pipeline/scripts/build_model_spec.py new file mode 100644 index 00000000..e612769a --- /dev/null +++ b/scripts/pipeline/scripts/build_model_spec.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python3 + +import argparse +import os +from pathlib import Path +import tempfile +from typing import Optional, List, Dict + +import yaml + + +OPAM_PATH = Path("coq-graph2tac.opam") +OPAM_TEMPLATE = """opam-version: "2.0" +synopsis: "Graph neural network that predicts tactics for Tactician" +description: "" +maintainer: ["Lasse Blaauwbroek "] +authors: [ + "Lasse Blaauwbroek" + "Mirek Olsak" + "Vasily Pestun" + "Jelle Piepenbrock" + "Jason Rute" + "Fidel I. Schaposnik Massolo" +] +homepage: "https://coq-tactician.github.io" +bug-reports: + "https://github.com/pestun/graph2tac/issues" +build: [ + [ "tar" "-xzf" "model.tar.gz" "--one-top-level" "--strip-components" "1" ] +] +install: [ + [ "mkdir" "-p" "%{share}%/%{name}%/" ] + [ "cp" "-r" "model/" "%{share}%/%{name}%/" ] + + [ "cp" "Graph2TacConfig.v" "%{lib}%/coq/user-contrib/Tactician/Graph2TacConfig.v" ] + + # We have to make sure that our injection flags get loaded after the injection flags of coq-tactician-reinforce. + # We do this by using a name that is guaranteed to sort after coq-tactician-reinforce. + [ "mkdir" "-p" "%{share}%/coq-tactician/plugins/coq-tactician-reinforce-%{name}%/" ] + [ "cp" "injection-flags" "%{share}%/coq-tactician/plugins/coq-tactician-reinforce-%{name}%/" ] +] +dev-repo: "git+https://github.com/pestun/graph2tac.git" +pin-depends: [ + [ + "coq-tactician.8.11.dev" + "git+https://github.com/coq-tactician/coq-tactician.git#COQ_TACTICIAN_COMMIT" + ] + [ + "coq-tactician-reinforce.8.11.dev" + "git+ssh://git@github.com/coq-tactician/coq-tactician-reinforce.git#COQ_TACTICIAN_REINFORCE_COMMIT" + ] +] +depends: [ + "coq-tactician-reinforce" + "coq-tactician" +] +extra-source "model.tar.gz" { + src: "MODEL_SOURCE_URL" + checksum: "md5=MODEL_MD5SUM" +} +substs: [ + "Graph2TacConfig.v" +] +""" +OPAM_PARAMS = ["COQ_TACTICIAN_COMMIT", "COQ_TACTICIAN_REINFORCE_COMMIT", "MODEL_SOURCE_URL", "MODEL_MD5SUM"] + +PREREQUISITES_PATH = Path("prerequisites") +PREREQUISITES_TEMPLATE = """#!/usr/bin/env bash +set -ue +python3 -m venv ./venv +. ./venv/bin/activate +pip install GRAPH2TAC_PATH +""" +PREREQUISITES_PARAMS = ["GRAPH2TAC_PATH"] + +INJECTION_FLAGS_PATH = Path("injection-flags") +INJECTION_FLAGS_TEMPLATE = """-l Graph2TacConfig.v +""" +INJECTION_FLAGS_PARAMS = [] + +IN_V_PATH = Path("Graph2TacConfig.v.in") +IN_V_TEMPLATE = """INJECTIONS +""" +IN_V_PARAMS = ["INJECTIONS"] + +UPDATE_ENV_PATH = Path("update-env") +UPDATE_ENV_TEMPLATE = """#!/usr/bin/env bash +comm -13 <(env | sort) <(source ./venv/bin/activate && env | sort) +""" +UPDATE_ENV_PARAMS = [] + +def build_file_from_template( + filepath: Path, + template: str, + parameter_variables: List[str], + parameter_values: List[str] +): + """Build a file from a template. + + :param filepath: Path where the file will be created. + :type filepath: Path + :param template: A multi-line template including keywords to be replaced. + :type template: str + :param parameter_variables: List of keywords to replace in the template. + :type parameter_variables: List[str] + :param parameter_values: List of values to replace in each keyword. + :type parameter_values: List[str] + """ + file_parts: List[str] = [] + template_remainder = template + assert len(parameter_variables) == len(parameter_values) + assert all(isinstance(v, str) for v in parameter_values), f"Expected a list of strings. Got: {parameter_values}" + for p_var, p_value in zip(parameter_variables, parameter_values): + assert template_remainder.count(p_var) == 1 + part, template_remainder = template_remainder.split(p_var) + file_parts.append(part) + file_parts.append(p_value) + file_parts.append(template_remainder) + with filepath.open("w") as f: + f.write("".join(file_parts)) + +def read_params() -> tuple[str, dict]: + parser = argparse.ArgumentParser() + parser.add_argument( + "--git-branch", + type=str, + help="Git branch to save this to. (For now assume it is unique.)" + ) + parser.add_argument( + "--params", + type=Path, + help="YAML parameter file" + ) + args = parser.parse_args() + + # check inputs + assert args.params.is_file(), f"{args.params} must be an existing file" + assert str(args.params).endswith(".yaml"), f"{args.params} must be a .yaml file" + + with args.params.open() as f: + param_dict=yaml.safe_load(f) + + return args.git_branch, param_dict + +def clone_repo(temp_dir: Path, branch_or_commit: str, new_branch: Optional[str]): + os.chdir(temp_dir) + os.system("git clone git@github.com:coq-tactician/coq-graph2tac-trained.git") + #TODO: check that it is there + os.chdir("coq-graph2tac-trained") + os.system(f"git checkout {branch_or_commit}") + #TODO: check that this worked + os.system(f"git checkout {new_branch} || git checkout -b {new_branch}") + #TODO: check that this worked + +def modify_files(temp_dir: Path, params: Dict[str, str]): + repo = temp_dir / "coq-graph2tac-trained" + os.chdir(repo) + build_file_from_template( + filepath=OPAM_PATH, + template=OPAM_TEMPLATE, + parameter_variables=OPAM_PARAMS, + parameter_values=[ + params["coq_tactician_commit"], + params["coq_tactician_reinforce_commit"], + params["model_source_url"], + params["model_md5sum"], + ] + ) + build_file_from_template( + filepath=PREREQUISITES_PATH, + template=PREREQUISITES_TEMPLATE, + parameter_variables=PREREQUISITES_PARAMS, + parameter_values=[ + params["graph2tac_path"], + ] + ) + build_file_from_template( + filepath=INJECTION_FLAGS_PATH, + template=INJECTION_FLAGS_TEMPLATE, + parameter_variables=INJECTION_FLAGS_PARAMS, + parameter_values=[] + ) + build_file_from_template( + filepath=IN_V_PATH, + template=IN_V_TEMPLATE, + parameter_variables=IN_V_PARAMS, + parameter_values=[ + "\n".join(params["injection_lines"]), + ] + ) + build_file_from_template( + filepath=UPDATE_ENV_PATH, + template=UPDATE_ENV_TEMPLATE, + parameter_variables=UPDATE_ENV_PARAMS, + parameter_values=[] + ) + with Path("params.yaml").open("w") as f: + yaml.safe_dump(params, f) + +def commit_and_push_repo(temp_dir: Path, commit_message: str, git_branch: str) -> str: + repo = temp_dir / "coq-graph2tac-trained" + os.chdir(repo) + print(os.popen("git diff").read()) + os.system("git add *") + os.system(f"git commit --allow-empty-message --allow-empty -m '{commit_message}'") + + # push + os.system(f"git push || git push --set-upstream origin {git_branch}") + git_commit = os.popen('git rev-parse HEAD').read().strip() + return git_commit + +def communicate_git_commit(git_commit: str): + print(f"Git commit: {git_commit}") + +def create_and_push_spec(git_branch: str, params: Dict[str, str]) -> str: + assert set(params.keys()) == { + "coq_tactician_commit", + "coq_tactician_reinforce_commit", + "model_source_url", + "model_md5sum", + "graph2tac_path", + "injection_lines", + "description" + }, params.keys() + + cwd = Path.cwd() + + with tempfile.TemporaryDirectory() as tmpdirname: + temp_dir = Path(tmpdirname) + clone_repo( + temp_dir=temp_dir, + branch_or_commit = "main", + new_branch = git_branch + ) + modify_files( + temp_dir=temp_dir, + params=params + ) + git_commit = commit_and_push_repo( + temp_dir=temp_dir, + commit_message=params["description"], + git_branch=git_branch + ) + + os.chdir(cwd) + return git_commit + +def main(): + git_branch, params = read_params() + git_commit = create_and_push_spec(git_branch=git_branch, params=params) + communicate_git_commit(git_commit) + +if __name__ == "__main__": + main() + \ No newline at end of file diff --git a/scripts/pipeline/scripts/jbsub_tools.py b/scripts/pipeline/scripts/jbsub_tools.py new file mode 100644 index 00000000..69f9c514 --- /dev/null +++ b/scripts/pipeline/scripts/jbsub_tools.py @@ -0,0 +1,149 @@ +import os +import re +import time +from typing import Any + +def submit_job(jbsub_prefix: list[Any], cmd: list[Any], verbose=True) -> tuple[str, str]: + full_cmd = " ".join(str(c) for c in jbsub_prefix + cmd) + if verbose: + print(full_cmd) + # TODO: This isn't capturing the stderr if the command has a mistake + jbsub_output = os.popen(full_cmd).read() + + + # extract jobid from jbsub output + m = re.search(r"Job <(\d*)> is submitted", jbsub_output) + assert m is not None, "Could not submit job. Here is the jbsub output:\n" + jbsub_output + jobid = m[1] + + return jobid, jbsub_output + +def job_info(jobid: str) -> tuple[str, str]: + """Get job info. (Work in progress) + + :return: full job info, job state (PEND, AVAIL, PSUSP, USUSP, RUN, SSUSP, DONE, EXIT) + :rtype: tuple[str, str] + """ + + info = os.popen(f"jbinfo -long-long {jobid}").read() + + # get status + m = re.search(r"Status <(\w*)>", info) + assert m is not None, info + return info, m[1] # full_info, status + +def wait_for_job(jobid: str, verbose=True, max_wait_sec=610) -> tuple[str, str]: + prev_status = "" + info_lines_printed = 0 + while True: + # wait until the status has changed + # use fibonacci numbers as amount of time to wait until reach max + fib0 = 0 + fib1 = 1 + while True: + time.sleep(min(fib0, max_wait_sec)) + fib1, fib0 = fib0 + fib1, fib1 + + info, status = job_info(jobid) + if status != prev_status: + break + + if verbose: + print() + print("Job status:", status) + # find first line in info without `<` + info_lines = info.split("\n") + for line in info_lines[info_lines_printed:]: + if "<" in line: + print(line) + info_lines_printed += 1 + else: + break # stop at first line without `<...>` pattern + + if status in ["DONE", "EXIT"]: + break + else: + prev_status = status + + if verbose: + # wait one second to make sure I have all the info + time.sleep(1) + info, status = job_info(jobid) + print("\n".join(info.split("\n")[info_lines_printed:])) + + return info, status + +def wait_for_jobs(jobids: list[str], verbose=True, max_wait_sec=610) -> tuple[list[str], list[str]]: + prev_statuses = ["" for _ in jobids] + info_lines_printed = [0 for _ in jobids] + while True: + # wait until the status has changed + # use fibonacci numbers as amount of time to wait until reach max + fib0 = 0 + fib1 = 1 + while True: + time.sleep(min(fib0, max_wait_sec)) + fib1, fib0 = fib0 + fib1, fib1 + + infos = [] + statuses = [] + changed = [] + for jobid, prev_status in zip(jobids, prev_status): + info, status = job_info(jobid) + infos.append(info) + statuses.append(status) + changed.append(status != prev_status) + + if any(changed): + break + + if verbose: + new_info_lines_printed = [] + for jobid, info, status, info_line_printed, changed in zip(jobids, infos, statuses,info_lines_printed, changed): + if not changed: + new_info_lines_printed.append(info_line_printed) + continue + print() + print(f"Job {jobid} status:", status) + # find first line in info without `<` + info_lines = info.split("\n") + new_info_line_printed = info_line_printed + for line in info_lines[info_line_printed:]: + if "<" in line: + print(line) + new_info_line_printed += 1 + else: + break # stop at first line without `<...>` pattern + new_info_lines_printed.append(new_info_line_printed) + info_lines_printed = new_info_lines_printed + if all(s in ["DONE", "EXIT"] for s in statuses): + break + else: + prev_statuses = statuses + + + # wait one second to make sure I have all the info + time.sleep(1) + infos = [] + statuses = [] + for jobid, info_line_printed in zip(jobids, info_lines_printed): + info, status = job_info(jobid) + infos.append(info) + statuses.append(status) + if verbose: + print("\n".join(info.split("\n")[info_line_printed:])) + + return infos, statuses + + +def wait_while_job_is(jobid: str, statuses: list[str], max_wait_sec=610) -> tuple[str, str]: + # use fibonacci numbers as amount of time to wait until reach max + fib0 = 0 + fib1 = 1 + while True: + time.sleep(min(fib0, max_wait_sec)) + fib1, fib0 = fib0 + fib1, fib1 + + info, state = job_info(jobid) + if state not in statuses: + return info, state \ No newline at end of file diff --git a/scripts/pipeline/scripts/job_runner.py b/scripts/pipeline/scripts/job_runner.py new file mode 100644 index 00000000..971b2299 --- /dev/null +++ b/scripts/pipeline/scripts/job_runner.py @@ -0,0 +1,162 @@ +import datetime +import os +from pathlib import Path +import time +from typing import Any +import jbsub_tools + +class JobRunnerBackEnd: + """ + Abstract class for running jobs in a cluster or just in a background process on a machine. + """ + + def _submit_job(self, cmd: list[str], verbose=True): + """Abstract method to submit a job in a cluster or a background process. + + :param cmd: List of strings that, when concatenated, form a linux command, e.g. ["echo", "'hello world'"] + :param verbose: Print additional data about the job, defaults to True + """ + pass + + def _wait_for_job(self, verbose=True): + """Abstract method to block until the submitted running job completes. + + :param verbose: Print periodic information about the job status, defaults to True + """ + pass + + +class TmuxJobRunner(JobRunnerBackEnd): + """ + Runner for running jobs in tmux. + """ + + def __init__(self, tmux_params: dict, log_dir: Path, job_name: str): + timestamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT-%H-%M-%SZ") + self._tmux_session_id = f"{timestamp}_{job_name}" + self._tmux_prefix = self.build_tmux_cmd(self._tmux_session_id) + self._log_file = log_dir / f"{timestamp}_job_{job_name}.log" + + @staticmethod + def build_tmux_cmd(session_id: str) -> list[str]: + prefix = ["tmux", "new-session"] # start a new session + prefix.extend(["-s", session_id]) # name the session + prefix.extend(["-d"]) # run in dettached mode (so it runs like a job in the background) + return prefix + + def _submit_job(self, cmd: list[str], verbose=True): + """Submit a job in a cluster or a background process. + + :param cmd: List of strings that, when concatenated, form a linux command, e.g. ["echo", "'hello world'"] + :param verbose: Print additional data about the job, defaults to True + """ + prefix = " ".join(str(c) for c in self._tmux_prefix) + inner_cmd = " ".join(cmd) + full_cmd = f"{prefix} '{inner_cmd} 2>&1 | tee {self._log_file}'" + if verbose: + print(full_cmd) + + tmux_output = os.popen(full_cmd).read() + + assert tmux_output == "", "Could not submit job. Here is the tmux output:\n" + tmux_output + + def _wait_for_job(self, verbose=True): + """Block until the submitted running job completes. + + :param verbose: Print periodic information about the job status, defaults to True + """ + printed_session_info = False + while True: + tmux_sessions = os.popen("tmux list-sessions").read() + this_job_sessions = [s for s in tmux_sessions.split() if s.startswith(self._tmux_session_id)] + assert len(this_job_sessions) <= 1, this_job_sessions + if this_job_sessions: + if verbose: + print("\nTmux job running:") + print(this_job_sessions[0]) + print("") + printed_session_info = True + else: + return # session finished + time.sleep(60) + + +class JbsubJobRunner(JobRunnerBackEnd): + """ + Runner for running jobs on LSF using a particular interface called jbsub. + """ + + def __init__(self, jbsub_params: dict, log_dir: Path, job_name: str): + self._jobid = "" + self._jbsub_prefix = self.build_jbsub_cmd(jbsub_params, log_dir, job_name) + + @staticmethod + def build_jbsub_cmd(cluster_params: dict, log_dir: Path, job_name="") -> list[str]: + timestamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT-%H-%M-%SZ") + + jbsub = ["jbsub"] + jbsub.extend(["-q", cluster_params["queue"]]) + jbsub.extend(["-cores", f"{cluster_params['cpus']}+{cluster_params['gpus']}"]) + jbsub.extend(["-mem", f"{cluster_params['mem_gb']}G"]) + if cluster_params["require"]: + jbsub.extend(["-require", cluster_params["require"]]) + jbsub.extend(["-out", str(log_dir / f"{timestamp}_{job_name}_job_%J_stdout.log")]) # %J is the jobid + jbsub.extend(["-err", str(log_dir / f"{timestamp}_{job_name}_job_%J_stderr.log")]) # %J is the jobid + return jbsub + + def _submit_job(self, cmd: list[str], verbose=True): + """Submit a job in a cluster or a background process. + + :param cmd: List of strings that, when concatenated, form a linux command, e.g. ["echo", "'hello world'"] + :param verbose: Print additional data about the job, defaults to True + """ + self._jobid, _ = jbsub_tools.submit_job( + jbsub_prefix=self._jbsub_prefix, + cmd=cmd, + verbose=verbose + ) + + def _wait_for_job(self, verbose=True): + """Block until the submitted running job completes. + + :param verbose: Print periodic information about the job status, defaults to True + """ + jbsub_tools.wait_for_job(self._jobid, verbose=True) + + +class JobRunner: + """ + Run jobs in a variety of job runners + """ + + def __init__(self, job_params: dict, log_dir: Path, job_name: str): + job_runner = job_params["job_runner"] + if job_runner == "jbsub": + self.backend = JbsubJobRunner(job_params, log_dir, job_name) + elif job_runner == "tmux": + self.backend = TmuxJobRunner(job_params, log_dir, job_name) + else: + raise NotImplementedError(f"No support for job runner: {job_runner}") + + def run_cmd_in_background(self, cmd: list[Any], verbose=True): + """Submit a job as a background process. + + :param cmd: List of string-like objects that, when concatenated, + form a linux command, e.g. ["echo", "'hello world'"] + :param verbose: Print additional data about the job, defaults to True + """ + cmd = [str(c) for c in cmd] + self.backend._submit_job(cmd, verbose=verbose) + + def run_cmd_and_block(self, cmd: list[Any], verbose=True): + """Submit a job as a process and block until it is complete. + + (This is most useful if the job needs to run on a compute node in a cluster.) + + :param cmd: List of string-like objects that, when concatenated, + form a linux command, e.g. ["echo", "'hello world'"] + :param verbose: Print additional data about the job, defaults to True + """ + cmd = [str(c) for c in cmd] + self.backend._submit_job(cmd, verbose=verbose) + self.backend._wait_for_job(verbose=verbose) \ No newline at end of file diff --git a/scripts/pipeline/scripts/run.sh b/scripts/pipeline/scripts/run.sh new file mode 100644 index 00000000..a14de2f4 --- /dev/null +++ b/scripts/pipeline/scripts/run.sh @@ -0,0 +1,5 @@ +mkdir ../logs +python -u 1_initialize_script.py 2>&1 | tee ../logs/$(date -u +"%Y-%m-%dT-%H-%M-%SZ")_1_initialize.log +python -u 2_train_script.py 2>&1 | tee ../logs/$(date -u +"%Y-%m-%dT-%H-%M-%SZ")_2_train.log +python -u 3_benchmark_script.py 2>&1 | tee ../logs/$(date -u +"%Y-%m-%dT-%H-%M-%SZ")_3_benchmark.log +rm -r __pycache__ \ No newline at end of file diff --git a/scripts/pipeline/scripts/tmux_run.sh b/scripts/pipeline/scripts/tmux_run.sh new file mode 100644 index 00000000..09384ef0 --- /dev/null +++ b/scripts/pipeline/scripts/tmux_run.sh @@ -0,0 +1 @@ +tmux new-session -d -s $(basename $(dirname "$PWD")) "sh run.sh" \ No newline at end of file diff --git a/scripts/pipeline/scripts/train_script.py b/scripts/pipeline/scripts/train_script.py new file mode 100644 index 00000000..cbd0540d --- /dev/null +++ b/scripts/pipeline/scripts/train_script.py @@ -0,0 +1,233 @@ +""" +Script governing all training runs + +Compute intensive code to be run on a compute node +""" +import argparse +from dataclasses import dataclass +import os +from pathlib import Path +import shutil +import tempfile +from typing import Any +import yaml + + +@dataclass +class BaseTrainer: + round: int + conda_env_path: Path + paramsdir: Path + datadir: Path + resultsdir: Path + workdir: Path + model_params: dict[str, Any] + + def save_params_and_build_train_cmd(self) -> list[Any]: + raise NotImplemented + + def train_in_conda(self) -> int: + # print gpu information for debugging + os.system("nvidia-smi") + + train_cmd = self.save_params_and_build_train_cmd() + + # to run something in conda, put it in a script to set the env variables + script = [] + script.append("export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CONDA_PREFIX/lib") + script.append("") + script.append(" ".join(str(c) for c in train_cmd)) + script.append("") + + script = "\n".join(script) + print(script) + + script_sh = self.workdir / "run_training.sh" + script_sh.write_text(script) + + conda_run_prefix = f"conda run -p {self.conda_env_path} --live-stream" + exit_code = os.system(f"{conda_run_prefix} sh {script_sh}") + return exit_code + + def cleanup(self): + raise NotImplemented + + +class HModelTrainer(BaseTrainer): + def save_params_and_build_train_cmd(self) -> list[Any]: + cmd = [] + cmd.extend(["g2t-train-hmodel", self.datadir]) + cmd.extend(["--max_subgraph_size", self.model_params["max_subgraph_size"]]) + if self.model_params["with_context"]: + cmd.extend(["--with_context"]) + return cmd + + def cleanup(self): + # move the results + # There would be at most one hmodel file in the workdir. + # Move it to its own directory undert the results dir. + hmodel_file = self.workdir / "hmodel.sav" + if hmodel_file.exists(): + model_dir = self.resultsdir / "hmodel" + model_dir.mkdir(exist_ok=True) + new_hmodel_file = model_dir / "hmodel.sav" + hmodel_file.rename(new_hmodel_file) + print(f"hmodel model moved to {model_dir}") + + +class TF2Trainer(BaseTrainer): + + def save_params_and_build_train_cmd(self) -> list[Any]: + cmd = [] + + # yaml params + tf2_param_file = self.paramsdir / "tf2_params.yml" + with tf2_param_file.open(mode="w") as f: + yaml.dump(self.model_params, f) + cmd.extend(["g2t-train", self.datadir, tf2_param_file]) + + # other arguments + cmd.extend(["--work-dir", self.workdir]) + cmd.extend(["--output-dir", self.resultsdir]) + cmd.extend(["--logging-level", "INFO"]) + + # See if there is already a checkpoint. If so, then restart from that. + weights_dir = self.resultsdir / "weights" + if weights_dir.exists(): + checkpoints_epochs = [ + int(cp.name.split("epoch")[1]) + for cp in weights_dir.iterdir() + if cp.is_dir() and cp.name.startswith("checkpoint__epoch") + ] + if checkpoints_epochs: + epoch = max(checkpoints_epochs) + print(f"Found checkpoint epoch {epoch}. Continuing from that checkpoint.") + print() + cmd.extend(["--from-checkpoint", epoch]) + + return cmd + + def cleanup(self): + pass + + +class TFGNNTrainer(BaseTrainer): + + def save_params_and_build_train_cmd(self) -> list[Any]: + cmd = [] + # train the tfgnn model + cmd.extend(["g2t-train-tfgnn"]) + cmd.extend(["--data-dir", self.datadir]) + + for config_key in ["dataset", "prediction_task", "definition_task", "trainer", "run"]: + # save config as a yaml file to the params directory + config_yml = self.paramsdir / (config_key + ".yml") + with config_yml.open(mode="w") as f: + yaml.dump(self.model_params[config_key], f) + # set cmd arg + cmd.extend([f"--{config_key.replace('_', '-')}-config", config_yml]) + + cmd.extend(["--log", self.resultsdir]) + cmd.extend(["--gpu", "all"]) + + return cmd + + def cleanup(self): + pass + + +def copy_data_to_tmp_dir(datadir: Path, tmpdir: Path) -> Path: + assert datadir.is_dir(), datadir + assert tmpdir.is_dir(), tmpdir + new_datadir = tmpdir / datadir.name + shutil.copytree(datadir, tmpdir / datadir.name) + return new_datadir + +def read_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument( + "--model-arch", + type=str, + help="Model architecture: tfgnn, tf2, hmodel" + ) + parser.add_argument( + "--model-params", + type=Path, + help="YAML parameter file for the model" + ) + parser.add_argument( + "--conda_env_path", + type=Path, + help="Location of conda environment with model training code loaded" + ) + parser.add_argument( + "--round", + type=int, + help="Number of training restarts (starting at zero)" + ) + parser.add_argument( + "--datadir", + type=Path, + help="Location of dataset" + ) + parser.add_argument( + "--workdir", + type=Path, + help="Directory to run from (files won't be saved)" + ) + parser.add_argument( + "--resultsdir", + type=Path, + help="Location to store trained models and other results" + ) + parser.add_argument( + "--paramsdir", + type=Path, + help="Location to store parameter yaml files and other communication files" + ) + args = parser.parse_args() + + # check inputs + assert args.model_params.is_file(), f"{args.params} must be an existing file" + assert str(args.model_params).endswith(".yaml"), f"{args.params} must be a .yaml file" + + return args + +def main(): + args = read_args() + + with Path(args.model_params).open() as f: + params = yaml.safe_load(f) + + with tempfile.TemporaryDirectory() as tmpdirname: + datadir = copy_data_to_tmp_dir(datadir=args.datadir, tmpdir=Path(tmpdirname)) + + if args.model_arch == "hmodel": + Trainer = HModelTrainer + elif args.model_arch == "tf2": + Trainer = TF2Trainer + elif args.model_arch == "tfgnn": + Trainer = TFGNNTrainer + else: + raise Exception(f"Unknown model arch: {args.model_arch}") + + trainer = Trainer( + round=args.round, + conda_env_path=args.conda_env_path, + paramsdir=args.paramsdir, + datadir=datadir, + resultsdir=args.resultsdir, + workdir=args.workdir, + model_params=params + ) + exit_code = trainer.train_in_conda() + trainer.cleanup() + + if exit_code: + print() + print(f"Training did not exit properly. Exit code: {exit_code}") + exit(exit_code) + + +if __name__ == "__main__": + main() diff --git a/scripts/pipeline/scripts/utils.py b/scripts/pipeline/scripts/utils.py new file mode 100644 index 00000000..76bcecb0 --- /dev/null +++ b/scripts/pipeline/scripts/utils.py @@ -0,0 +1,65 @@ +import os +from pathlib import Path +import yaml + +class Utils: + @staticmethod + def setup_conda(conda_env_path: Path, conda_params: dict): + print() + print("========") + print("Setting up conda env") + print("========") + print() + os.system(f"conda create -y -p {conda_env_path} python={conda_params['python']}") + for cmd in conda_params["setup_cmds"]: + os.system(f"conda run -p {conda_env_path} {cmd}") + assert conda_env_path.exists() + + @staticmethod + def setup_opam(conda_run_prefix: str, opam_switch_path: Path, opam_params: dict): + print() + print("========") + print("Setting up opam switch") + print("========") + print() + os.system(f"{conda_run_prefix} opam switch create {opam_switch_path} --empty") + + # to run opam commands, make a temporary script and run that + install_script_sh = opam_switch_path / "setup_opam.sh" + script = [f"eval $(opam env --switch={opam_switch_path})"] + for cmd in opam_params["setup_cmds"]: + script.append(cmd) + install_script_sh.write_text("\n".join(script)) + + os.system(f"{conda_run_prefix} sh {install_script_sh}") + assert (opam_switch_path / "_opam").exists() + + @staticmethod + def get_run_dir_name(): + run_dir_name = str(Path(".").resolve().parent.name) # parent directory + assert run_dir_name.startswith("20"), ( + f"Not run from expected directory structure.\n" + f"wkdir: {Path('.').resolve()}\n" + f"Unexpected parent:{run_dir_name}" + ) + return run_dir_name + + @staticmethod + def get_params(): + with Path("../params.yaml").open() as f: + params = yaml.safe_load(f) + return params + + @staticmethod + def get_directories() -> dict[str, Path]: + directories = {} + directories["logs"] = Path("../logs").resolve() + directories["scripts"] = Path("../scripts").resolve() + directories["params"] = Path("../params").resolve() + directories["results"] = Path("../results").resolve() + directories["workdir"] = Path("../workdir").resolve() + + for key, path in directories.items(): + assert path.exists(), f"{key} directory doesn't exist: {path}" + + return directories \ No newline at end of file