diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index a8fec259..da3543d5 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -52,7 +52,7 @@ jobs: # Integration tests do require external systems to be running (most commonly a database instance). # Unlike end-to-end tests though, they test a specific module in a detailed manner, much like a unit test does. env: - # We set `INTENDED_DBDATA_HARDWARE` so that it's seen when `integtest_pg_conn.py` executes `_set_up_gymlib_integtest_workspace.sh`. + # The CI runs on ssd so we have to set this. INTENDED_DBDATA_HARDWARE: ssd run: | . "$HOME/.cargo/env" diff --git a/benchmark/cli.py b/benchmark/cli.py index 1cf63418..45943a16 100644 --- a/benchmark/cli.py +++ b/benchmark/cli.py @@ -8,7 +8,7 @@ @click.group(name="benchmark") @click.pass_obj def benchmark_group(dbgym_workspace: DBGymWorkspace) -> None: - dbgym_workspace.append_group("benchmark") + pass benchmark_group.add_command(tpch_group) diff --git a/benchmark/job/cli.py b/benchmark/job/cli.py index 84df02ef..066fa3cc 100644 --- a/benchmark/job/cli.py +++ b/benchmark/job/cli.py @@ -2,17 +2,17 @@ from typing import Optional import click +from gymlib.symlinks_paths import ( + get_tables_dirname, + get_workload_dirname, + get_workload_suffix, + name_to_linkname, +) from benchmark.constants import DEFAULT_SCALE_FACTOR from util.log import DBGYM_LOGGER_NAME from util.shell import subprocess_run -from util.workspace import ( - DBGymWorkspace, - get_default_tables_dname, - get_workload_name, - is_fully_resolved, - link_result, -) +from util.workspace import DBGymWorkspace, fully_resolve_path JOB_TABLES_URL = "https://event.cwi.nl/da/job/imdb.tgz" JOB_QUERIES_URL = "https://event.cwi.nl/da/job/job.tgz" @@ -137,18 +137,22 @@ @click.group(name="job") @click.pass_obj def job_group(dbgym_workspace: DBGymWorkspace) -> None: - dbgym_workspace.append_group("job") + pass -@job_group.command(name="data") +@job_group.command(name="tables") # We expose this option to keep its interface consistent with other workloads, but you should never pass in something other than DEFAULT_SCALE_FACTOR. @click.argument("scale-factor", type=float) @click.pass_obj -# The reason generate data is separate from create dbdata is because generate-data is generic +# The reason generate data is separate from create dbdata is because generate data is generic # to all DBMSs while create dbdata is specific to a single DBMS. -def job_data(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None: +def job_tables(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None: + _job_tables(dbgym_workspace, scale_factor) + + +def _job_tables(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None: assert scale_factor == DEFAULT_SCALE_FACTOR - _download_job_data(dbgym_workspace) + _download_job_tables(dbgym_workspace) @job_group.command(name="workload") @@ -161,18 +165,24 @@ def job_data(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None: @click.pass_obj def job_workload( dbgym_workspace: DBGymWorkspace, query_subset: str, scale_factor: float +) -> None: + _job_workload(dbgym_workspace, query_subset, scale_factor) + + +def _job_workload( + dbgym_workspace: DBGymWorkspace, query_subset: str, scale_factor: float ) -> None: assert scale_factor == DEFAULT_SCALE_FACTOR _download_job_queries(dbgym_workspace) _generate_job_workload(dbgym_workspace, query_subset) -def _download_job_data(dbgym_workspace: DBGymWorkspace) -> None: +def _download_job_tables(dbgym_workspace: DBGymWorkspace) -> None: _download_and_untar_dir( dbgym_workspace, JOB_TABLES_URL, "imdb.tgz", - get_default_tables_dname(DEFAULT_SCALE_FACTOR), + get_tables_dirname("job", DEFAULT_SCALE_FACTOR), ) @@ -199,51 +209,66 @@ def _download_and_untar_dir( an "original" directory name. If this is the case, you should set `untarred_original_dname` to ensure that it gets renamed to `untarred_dname`. """ - expected_symlink_dpath = ( - dbgym_workspace.cur_symlinks_data_path(mkdir=True) / f"{untarred_dname}.link" + expected_symlink_path = ( + dbgym_workspace.dbgym_cur_symlinks_path / f"{untarred_dname}.link" ) - if expected_symlink_dpath.exists(): + if expected_symlink_path.exists(): logging.getLogger(DBGYM_LOGGER_NAME).info( - f"Skipping download: {expected_symlink_dpath}" + f"Skipping download: {expected_symlink_path}" ) return - logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloading: {expected_symlink_dpath}") - real_data_path = dbgym_workspace.cur_task_runs_data_path(mkdir=True) - subprocess_run(f"curl -O {download_url}", cwd=real_data_path) - untarred_data_dpath = dbgym_workspace.cur_task_runs_data_path(untarred_dname) + logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloading: {expected_symlink_path}") + subprocess_run(f"curl -O {download_url}", cwd=dbgym_workspace.dbgym_this_run_path) + untarred_data_path = dbgym_workspace.dbgym_this_run_path / untarred_dname if untarred_original_dname is not None: - assert not untarred_data_dpath.exists() - subprocess_run(f"tar -zxvf {download_tarred_fname}", cwd=real_data_path) - assert (real_data_path / untarred_original_dname).exists() + assert not untarred_data_path.exists() + subprocess_run( + f"tar -zxvf {download_tarred_fname}", + cwd=dbgym_workspace.dbgym_this_run_path, + ) + assert (dbgym_workspace.dbgym_this_run_path / untarred_original_dname).exists() subprocess_run( - f"mv {untarred_original_dname} {untarred_dname}", cwd=real_data_path + f"mv {untarred_original_dname} {untarred_dname}", + cwd=dbgym_workspace.dbgym_this_run_path, ) else: - untarred_data_dpath.mkdir(parents=True, exist_ok=False) - subprocess_run(f"tar -zxvf ../{download_tarred_fname}", cwd=untarred_data_dpath) + untarred_data_path.mkdir(parents=True, exist_ok=False) + subprocess_run(f"tar -zxvf ../{download_tarred_fname}", cwd=untarred_data_path) - assert untarred_data_dpath.exists() - subprocess_run(f"rm {download_tarred_fname}", cwd=real_data_path) - symlink_dpath = link_result(dbgym_workspace, untarred_data_dpath) - assert expected_symlink_dpath.samefile(symlink_dpath) - logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloaded: {expected_symlink_dpath}") + assert untarred_data_path.exists() + subprocess_run( + f"rm {download_tarred_fname}", cwd=dbgym_workspace.dbgym_this_run_path + ) + symlink_path = dbgym_workspace.link_result(untarred_data_path) + assert expected_symlink_path.samefile(symlink_path) + logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloaded: {expected_symlink_path}") def _generate_job_workload( dbgym_workspace: DBGymWorkspace, query_subset: str, ) -> None: - workload_name = get_workload_name(DEFAULT_SCALE_FACTOR, query_subset) - expected_workload_symlink_dpath = dbgym_workspace.cur_symlinks_data_path( - mkdir=True - ) / (workload_name + ".link") + workload_name = get_workload_dirname( + "job", + DEFAULT_SCALE_FACTOR, + get_workload_suffix("job", query_subset=query_subset), + ) + expected_workload_symlink_path = dbgym_workspace.dbgym_cur_symlinks_path / ( + name_to_linkname(workload_name) + ) + if expected_workload_symlink_path.exists(): + logging.getLogger(DBGYM_LOGGER_NAME).info( + f"Skipping generation: {expected_workload_symlink_path}" + ) + return logging.getLogger(DBGYM_LOGGER_NAME).info( - f"Generating: {expected_workload_symlink_dpath}" + f"Generating: {expected_workload_symlink_path}" ) - real_dpath = dbgym_workspace.cur_task_runs_data_path(workload_name, mkdir=True) + workload_path = dbgym_workspace.dbgym_this_run_path / workload_name + workload_path.mkdir(parents=False, exist_ok=False) query_names = None if query_subset == "all": @@ -255,19 +280,17 @@ def _generate_job_workload( else: assert False - with open(real_dpath / "order.txt", "w") as f: + with open(workload_path / "order.txt", "w") as f: + queries_parent_path = dbgym_workspace.dbgym_cur_symlinks_path / ( + name_to_linkname(JOB_QUERIES_DNAME) + ) + for qname in query_names: - sql_fpath = ( - dbgym_workspace.cur_symlinks_data_path(mkdir=True) - / (f"{JOB_QUERIES_DNAME}.link") - ).resolve() / f"{qname}.sql" - assert is_fully_resolved( - sql_fpath - ), "We should only write existent real absolute paths to a file" - f.write(f"Q{qname},{sql_fpath}\n") + sql_path = fully_resolve_path(queries_parent_path / f"{qname}.sql") + f.write(f"Q{qname},{sql_path}\n") - workload_symlink_dpath = link_result(dbgym_workspace, real_dpath) - assert workload_symlink_dpath == expected_workload_symlink_dpath + workload_symlink_path = dbgym_workspace.link_result(workload_path) + assert workload_symlink_path == expected_workload_symlink_path logging.getLogger(DBGYM_LOGGER_NAME).info( - f"Generated: {expected_workload_symlink_dpath}" + f"Generated: {expected_workload_symlink_path}" ) diff --git a/benchmark/job/load_info.py b/benchmark/job/load_info.py index e205847d..8ea28f8a 100644 --- a/benchmark/job/load_info.py +++ b/benchmark/job/load_info.py @@ -1,16 +1,16 @@ from pathlib import Path from typing import Optional +from gymlib.symlinks_paths import get_tables_symlink_path + from benchmark.constants import DEFAULT_SCALE_FACTOR from dbms.load_info_base_class import LoadInfoBaseClass -from util.workspace import DBGymWorkspace, get_default_tables_dname, is_fully_resolved +from util.workspace import DBGymWorkspace, fully_resolve_path JOB_SCHEMA_FNAME = "job_schema.sql" class JobLoadInfo(LoadInfoBaseClass): - CODEBASE_PATH_COMPONENTS = ["dbgym", "benchmark", "job"] - CODEBASE_DNAME = "_".join(CODEBASE_PATH_COMPONENTS) TABLES = [ "aka_name", "aka_title", @@ -36,43 +36,34 @@ class JobLoadInfo(LoadInfoBaseClass): ] def __init__(self, dbgym_workspace: DBGymWorkspace): - # schema and constraints - schema_root_dpath = dbgym_workspace.base_dbgym_repo_dpath - for component in JobLoadInfo.CODEBASE_PATH_COMPONENTS[ - 1: - ]: # [1:] to skip "dbgym" - schema_root_dpath /= component - self._schema_fpath = schema_root_dpath / JOB_SCHEMA_FNAME + # Schema (directly in the codebase). + job_codebase_path = dbgym_workspace.base_dbgym_repo_path / "benchmark" / "job" + self._schema_path = job_codebase_path / JOB_SCHEMA_FNAME assert ( - self._schema_fpath.exists() - ), f"self._schema_fpath ({self._schema_fpath}) does not exist" + self._schema_path.exists() + ), f"self._schema_path ({self._schema_path}) does not exist" # Tables - data_root_dpath = ( - dbgym_workspace.dbgym_symlinks_path / JobLoadInfo.CODEBASE_DNAME / "data" - ) - tables_symlink_dpath = ( - data_root_dpath / f"{get_default_tables_dname(DEFAULT_SCALE_FACTOR)}.link" + tables_path = fully_resolve_path( + get_tables_symlink_path( + dbgym_workspace.dbgym_workspace_path, "job", DEFAULT_SCALE_FACTOR + ) ) - tables_dpath = tables_symlink_dpath.resolve() - assert is_fully_resolved( - tables_dpath - ), f"tables_dpath ({tables_dpath}) should be an existent real absolute path. Make sure you have generated the TPC-H data" - self._tables_and_fpaths = [] + self._tables_and_paths = [] for table in JobLoadInfo.TABLES: - table_fpath = tables_dpath / f"{table}.csv" - self._tables_and_fpaths.append((table, table_fpath)) + table_path = tables_path / f"{table}.csv" + self._tables_and_paths.append((table, table_path)) - def get_schema_fpath(self) -> Path: - return self._schema_fpath + def get_schema_path(self) -> Path: + return self._schema_path - def get_tables_and_fpaths(self) -> list[tuple[str, Path]]: - return self._tables_and_fpaths + def get_tables_and_paths(self) -> list[tuple[str, Path]]: + return self._tables_and_paths def get_table_file_delimiter(self) -> str: return "," - def get_constraints_fpath(self) -> Optional[Path]: + def get_constraints_path(self) -> Optional[Path]: # JOB does not have any constraints. It does have indexes, but we don't want to create # those indexes so that the tuning agent can start from a clean slate. return None diff --git a/benchmark/tests/__init__.py b/benchmark/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/benchmark/tests/benchmark_integtest_dbgym_config.yaml b/benchmark/tests/benchmark_integtest_dbgym_config.yaml new file mode 100644 index 00000000..8a1b1cca --- /dev/null +++ b/benchmark/tests/benchmark_integtest_dbgym_config.yaml @@ -0,0 +1 @@ +dbgym_workspace_path: ../dbgym_benchmark_integtest_workspace/ diff --git a/benchmark/tests/integtest_benchmark.py b/benchmark/tests/integtest_benchmark.py new file mode 100644 index 00000000..b4f4fdbc --- /dev/null +++ b/benchmark/tests/integtest_benchmark.py @@ -0,0 +1,99 @@ +import shutil +import unittest +from pathlib import Path + +from gymlib.symlinks_paths import ( + get_tables_symlink_path, + get_workload_suffix, + get_workload_symlink_path, +) + +# It's ok to import private functions from the benchmark module because this is an integration test. +from benchmark.constants import DEFAULT_SCALE_FACTOR +from benchmark.job.cli import _job_tables, _job_workload +from benchmark.tpch.cli import _tpch_tables, _tpch_workload +from benchmark.tpch.constants import DEFAULT_TPCH_SEED +from util.workspace import ( + DBGymWorkspace, + fully_resolve_path, + get_workspace_path_from_config, +) + + +class BenchmarkTests(unittest.TestCase): + DBGYM_CONFIG_PATH = Path("benchmark/tests/benchmark_integtest_dbgym_config.yaml") + + def setUp(self) -> None: + workspace_path = get_workspace_path_from_config( + BenchmarkTests.DBGYM_CONFIG_PATH + ) + # Get a clean start each time. + if workspace_path.exists(): + shutil.rmtree(workspace_path) + + # Reset this to avoid the error of it being created twice. + # In real usage, the second run would be a different Python process so DBGymWorkspace._num_times_created_this_run would be 0. + DBGymWorkspace._num_times_created_this_run = 0 + self.workspace = DBGymWorkspace(workspace_path) + + def tearDown(self) -> None: + if self.workspace.dbgym_workspace_path.exists(): + shutil.rmtree(self.workspace.dbgym_workspace_path) + + def test_tpch_tables(self) -> None: + scale_factor = 0.01 + tables_path = get_tables_symlink_path( + self.workspace.dbgym_workspace_path, "tpch", scale_factor + ) + self.assertFalse(tables_path.exists()) + _tpch_tables(self.workspace, scale_factor) + self.assertTrue(tables_path.exists()) + self.assertTrue(fully_resolve_path(tables_path).exists()) + + def test_job_tables(self) -> None: + tables_path = get_tables_symlink_path( + self.workspace.dbgym_workspace_path, "job", DEFAULT_SCALE_FACTOR + ) + self.assertFalse(tables_path.exists()) + _job_tables(self.workspace, DEFAULT_SCALE_FACTOR) + self.assertTrue(tables_path.exists()) + self.assertTrue(fully_resolve_path(tables_path).exists()) + + def test_tpch_workload(self) -> None: + scale_factor = 0.01 + workload_path = get_workload_symlink_path( + self.workspace.dbgym_workspace_path, + "tpch", + scale_factor, + get_workload_suffix( + "tpch", + seed_start=DEFAULT_TPCH_SEED, + seed_end=DEFAULT_TPCH_SEED, + query_subset="all", + ), + ) + self.assertFalse(workload_path.exists()) + _tpch_workload( + self.workspace, DEFAULT_TPCH_SEED, DEFAULT_TPCH_SEED, "all", scale_factor + ) + self.assertTrue(workload_path.exists()) + self.assertTrue(fully_resolve_path(workload_path).exists()) + + def test_job_workload(self) -> None: + workload_path = get_workload_symlink_path( + self.workspace.dbgym_workspace_path, + "job", + DEFAULT_SCALE_FACTOR, + get_workload_suffix( + "job", + query_subset="all", + ), + ) + self.assertFalse(workload_path.exists()) + _job_workload(self.workspace, "all", DEFAULT_SCALE_FACTOR) + self.assertTrue(workload_path.exists()) + self.assertTrue(fully_resolve_path(workload_path).exists()) + + +if __name__ == "__main__": + unittest.main() diff --git a/benchmark/tpch/cli.py b/benchmark/tpch/cli.py index eea9dc5c..c83fae75 100644 --- a/benchmark/tpch/cli.py +++ b/benchmark/tpch/cli.py @@ -1,36 +1,46 @@ import logging -from pathlib import Path import click +from gymlib.symlinks_paths import ( + get_scale_factor_string, + get_tables_dirname, + get_tables_symlink_path, + get_workload_suffix, + get_workload_symlink_path, + linkname_to_name, + name_to_linkname, +) from benchmark.constants import DEFAULT_SCALE_FACTOR from benchmark.tpch.constants import DEFAULT_TPCH_SEED, NUM_TPCH_QUERIES from util.log import DBGYM_LOGGER_NAME from util.shell import subprocess_run -from util.workspace import ( - DBGymWorkspace, - get_default_tables_dname, - get_scale_factor_string, - get_workload_name, - is_fully_resolved, - link_result, -) +from util.workspace import DBGymWorkspace, fully_resolve_path, is_fully_resolved + +TPCH_KIT_DIRNAME = "tpch-kit" @click.group(name="tpch") @click.pass_obj def tpch_group(dbgym_workspace: DBGymWorkspace) -> None: - dbgym_workspace.append_group("tpch") + pass -@tpch_group.command(name="data") +@tpch_group.command(name="tables") @click.argument("scale-factor", type=float) @click.pass_obj -# The reason generate data is separate from create dbdata is because generate-data is generic +# The reason generate tables is separate from create dbdata is because tpch_tables is generic # to all DBMSs while create dbdata is specific to a single DBMS. -def tpch_data(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None: +def tpch_tables(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None: + _tpch_tables(dbgym_workspace, scale_factor) + + +def _tpch_tables(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None: + """ + This function exists as a hook for integration tests. + """ _clone_tpch_kit(dbgym_workspace) - _generate_data(dbgym_workspace, scale_factor) + _generate_tpch_tables(dbgym_workspace, scale_factor) @tpch_group.command(name="workload") @@ -60,6 +70,19 @@ def tpch_workload( query_subset: str, scale_factor: float, ) -> None: + _tpch_workload(dbgym_workspace, seed_start, seed_end, query_subset, scale_factor) + + +def _tpch_workload( + dbgym_workspace: DBGymWorkspace, + seed_start: int, + seed_end: int, + query_subset: str, + scale_factor: float, +) -> None: + """ + This function exists as a hook for integration tests. + """ assert ( seed_start <= seed_end ), f"seed_start ({seed_start}) must be <= seed_end ({seed_end})" @@ -70,95 +93,94 @@ def tpch_workload( ) -def _get_queries_dname(seed: int, scale_factor: float) -> str: +def _get_queries_dirname(seed: int, scale_factor: float) -> str: return f"queries_{seed}_sf{get_scale_factor_string(scale_factor)}" def _clone_tpch_kit(dbgym_workspace: DBGymWorkspace) -> None: - expected_symlink_dpath = ( - dbgym_workspace.cur_symlinks_build_path(mkdir=True) / "tpch-kit.link" + expected_symlink_path = dbgym_workspace.dbgym_cur_symlinks_path / ( + name_to_linkname(TPCH_KIT_DIRNAME) ) - if expected_symlink_dpath.exists(): + if expected_symlink_path.exists(): logging.getLogger(DBGYM_LOGGER_NAME).info( - f"Skipping clone: {expected_symlink_dpath}" + f"Skipping clone: {expected_symlink_path}" ) return - logging.getLogger(DBGYM_LOGGER_NAME).info(f"Cloning: {expected_symlink_dpath}") - real_build_path = dbgym_workspace.cur_task_runs_build_path() + logging.getLogger(DBGYM_LOGGER_NAME).info(f"Cloning: {expected_symlink_path}") subprocess_run( - f"./clone_tpch_kit.sh {real_build_path}", cwd=dbgym_workspace.cur_source_path() + f"./clone_tpch_kit.sh {dbgym_workspace.dbgym_this_run_path}", + cwd=dbgym_workspace.base_dbgym_repo_path / "benchmark" / "tpch", ) - symlink_dpath = link_result(dbgym_workspace, real_build_path / "tpch-kit") - assert expected_symlink_dpath.samefile(symlink_dpath) - logging.getLogger(DBGYM_LOGGER_NAME).info(f"Cloned: {expected_symlink_dpath}") - - -def _get_tpch_kit_dpath(dbgym_workspace: DBGymWorkspace) -> Path: - tpch_kit_dpath = ( - dbgym_workspace.cur_symlinks_build_path() / "tpch-kit.link" - ).resolve() - assert is_fully_resolved(tpch_kit_dpath) - return tpch_kit_dpath + symlink_path = dbgym_workspace.link_result( + dbgym_workspace.dbgym_this_run_path / TPCH_KIT_DIRNAME + ) + assert expected_symlink_path.samefile(symlink_path) + logging.getLogger(DBGYM_LOGGER_NAME).info(f"Cloned: {expected_symlink_path}") def _generate_tpch_queries( dbgym_workspace: DBGymWorkspace, seed_start: int, seed_end: int, scale_factor: float ) -> None: - tpch_kit_dpath = _get_tpch_kit_dpath(dbgym_workspace) - data_path = dbgym_workspace.cur_symlinks_data_path(mkdir=True) + tpch_kit_path = dbgym_workspace.dbgym_cur_symlinks_path / ( + name_to_linkname(TPCH_KIT_DIRNAME) + ) logging.getLogger(DBGYM_LOGGER_NAME).info( - f"Generating queries: {data_path} [{seed_start}, {seed_end}]" + f"Generating queries: [{seed_start}, {seed_end}]" ) for seed in range(seed_start, seed_end + 1): - expected_queries_symlink_dpath = data_path / ( - _get_queries_dname(seed, scale_factor) + ".link" + expected_queries_symlink_path = dbgym_workspace.dbgym_cur_symlinks_path / ( + name_to_linkname(_get_queries_dirname(seed, scale_factor)) ) - if expected_queries_symlink_dpath.exists(): + if expected_queries_symlink_path.exists(): continue - real_dir = dbgym_workspace.cur_task_runs_data_path( - _get_queries_dname(seed, scale_factor), mkdir=True + queries_parent_path = ( + dbgym_workspace.dbgym_this_run_path + / _get_queries_dirname(seed, scale_factor) ) + queries_parent_path.mkdir(parents=False, exist_ok=False) for i in range(1, NUM_TPCH_QUERIES + 1): - target_sql = (real_dir / f"{i}.sql").resolve() + target_sql = (queries_parent_path / f"{i}.sql").resolve() subprocess_run( f"DSS_QUERY=./queries ./qgen {i} -r {seed} -s {scale_factor} > {target_sql}", - cwd=tpch_kit_dpath / "dbgen", + cwd=tpch_kit_path / "dbgen", verbose=False, ) - queries_symlink_dpath = link_result(dbgym_workspace, real_dir) - assert queries_symlink_dpath.samefile(expected_queries_symlink_dpath) + queries_symlink_path = dbgym_workspace.link_result(queries_parent_path) + assert queries_symlink_path.samefile(expected_queries_symlink_path) logging.getLogger(DBGYM_LOGGER_NAME).info( - f"Generated queries: {data_path} [{seed_start}, {seed_end}]" + f"Generated queries: [{seed_start}, {seed_end}]" ) -def _generate_data(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None: - tpch_kit_dpath = _get_tpch_kit_dpath(dbgym_workspace) - data_path = dbgym_workspace.cur_symlinks_data_path(mkdir=True) - expected_tables_symlink_dpath = ( - data_path / f"{get_default_tables_dname(scale_factor)}.link" +def _generate_tpch_tables(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None: + tpch_kit_path = dbgym_workspace.dbgym_cur_symlinks_path / ( + name_to_linkname(TPCH_KIT_DIRNAME) + ) + expected_tables_symlink_path = get_tables_symlink_path( + dbgym_workspace.dbgym_workspace_path, "tpch", scale_factor ) - if expected_tables_symlink_dpath.exists(): + if expected_tables_symlink_path.exists(): logging.getLogger(DBGYM_LOGGER_NAME).info( - f"Skipping generation: {expected_tables_symlink_dpath}" + f"Skipping generation: {expected_tables_symlink_path}" ) return logging.getLogger(DBGYM_LOGGER_NAME).info( - f"Generating: {expected_tables_symlink_dpath}" + f"Generating: {expected_tables_symlink_path}" ) - subprocess_run(f"./dbgen -vf -s {scale_factor}", cwd=tpch_kit_dpath / "dbgen") - real_dir = dbgym_workspace.cur_task_runs_data_path( - get_default_tables_dname(scale_factor), mkdir=True + subprocess_run(f"./dbgen -vf -s {scale_factor}", cwd=tpch_kit_path / "dbgen") + tables_parent_path = dbgym_workspace.dbgym_this_run_path / get_tables_dirname( + "tpch", scale_factor ) - subprocess_run(f"mv ./*.tbl {real_dir}", cwd=tpch_kit_dpath / "dbgen") + tables_parent_path.mkdir(parents=False, exist_ok=False) + subprocess_run(f"mv ./*.tbl {tables_parent_path}", cwd=tpch_kit_path / "dbgen") - tables_symlink_dpath = link_result(dbgym_workspace, real_dir) - assert tables_symlink_dpath.samefile(expected_tables_symlink_dpath) + tables_symlink_path = dbgym_workspace.link_result(tables_parent_path) + assert tables_symlink_path.samefile(expected_tables_symlink_path) logging.getLogger(DBGYM_LOGGER_NAME).info( - f"Generated: {expected_tables_symlink_dpath}" + f"Generated: {expected_tables_symlink_path}" ) @@ -169,16 +191,27 @@ def _generate_tpch_workload( query_subset: str, scale_factor: float, ) -> None: - symlink_data_dpath = dbgym_workspace.cur_symlinks_data_path(mkdir=True) - workload_name = get_workload_name( - scale_factor, f"{seed_start}_{seed_end}_{query_subset}" + expected_workload_symlink_path = get_workload_symlink_path( + dbgym_workspace.dbgym_workspace_path, + "tpch", + scale_factor, + get_workload_suffix( + "tpch", seed_start=seed_start, seed_end=seed_end, query_subset=query_subset + ), ) - expected_workload_symlink_dpath = symlink_data_dpath / (workload_name + ".link") + if expected_workload_symlink_path.exists(): + logging.getLogger(DBGYM_LOGGER_NAME).info( + f"Skipping generation: {expected_workload_symlink_path}" + ) + return logging.getLogger(DBGYM_LOGGER_NAME).info( - f"Generating: {expected_workload_symlink_dpath}" + f"Generating: {expected_workload_symlink_path}" ) - real_dpath = dbgym_workspace.cur_task_runs_data_path(workload_name, mkdir=True) + workload_path = dbgym_workspace.dbgym_this_run_path / linkname_to_name( + expected_workload_symlink_path.name + ) + workload_path.mkdir(parents=False, exist_ok=False) query_names = None if query_subset == "all": @@ -190,21 +223,21 @@ def _generate_tpch_workload( else: assert False - with open(real_dpath / "order.txt", "w") as f: + with open(workload_path / "order.txt", "w") as f: for seed in range(seed_start, seed_end + 1): + queries_parent_path = dbgym_workspace.dbgym_cur_symlinks_path / ( + name_to_linkname(_get_queries_dirname(seed, scale_factor)) + ) + for qname in query_names: - sql_fpath = ( - symlink_data_dpath - / (_get_queries_dname(seed, scale_factor) + ".link") - ).resolve() / f"{qname}.sql" + sql_path = fully_resolve_path(queries_parent_path / f"{qname}.sql") assert is_fully_resolved( - sql_fpath + sql_path ), "We should only write existent real absolute paths to a file" - f.write(f"S{seed}-Q{qname},{sql_fpath}\n") - # TODO(WAN): add option to deep-copy the workload. + f.write(f"S{seed}-Q{qname},{sql_path}\n") - workload_symlink_dpath = link_result(dbgym_workspace, real_dpath) - assert workload_symlink_dpath == expected_workload_symlink_dpath + workload_symlink_path = dbgym_workspace.link_result(workload_path) + assert workload_symlink_path == expected_workload_symlink_path logging.getLogger(DBGYM_LOGGER_NAME).info( - f"Generated: {expected_workload_symlink_dpath}" + f"Generated: {expected_workload_symlink_path}" ) diff --git a/benchmark/tpch/load_info.py b/benchmark/tpch/load_info.py index f9710b04..a717173c 100644 --- a/benchmark/tpch/load_info.py +++ b/benchmark/tpch/load_info.py @@ -1,18 +1,16 @@ from pathlib import Path from typing import Optional +from gymlib.symlinks_paths import get_tables_symlink_path + from dbms.load_info_base_class import LoadInfoBaseClass -from util.workspace import DBGymWorkspace, get_default_tables_dname, is_fully_resolved +from util.workspace import DBGymWorkspace, fully_resolve_path TPCH_SCHEMA_FNAME = "tpch_schema.sql" TPCH_CONSTRAINTS_FNAME = "tpch_constraints.sql" class TpchLoadInfo(LoadInfoBaseClass): - # currently, hardcoding the path seems like the easiest solution. If the path ever changes, it'll - # just break an integration test and we can fix it. I don't want to prematurely overengineer it - CODEBASE_PATH_COMPONENTS = ["dbgym", "benchmark", "tpch"] - CODEBASE_DNAME = "_".join(CODEBASE_PATH_COMPONENTS) TABLES = [ "region", "nation", @@ -25,45 +23,36 @@ class TpchLoadInfo(LoadInfoBaseClass): ] def __init__(self, dbgym_workspace: DBGymWorkspace, scale_factor: float): - # schema and constraints - schema_root_dpath = dbgym_workspace.base_dbgym_repo_dpath - for component in TpchLoadInfo.CODEBASE_PATH_COMPONENTS[ - 1: - ]: # [1:] to skip "dbgym" - schema_root_dpath /= component - self._schema_fpath = schema_root_dpath / TPCH_SCHEMA_FNAME + # Schema and constraints (directly in the codebase). + tpch_codebase_path = dbgym_workspace.base_dbgym_repo_path / "benchmark" / "tpch" + self._schema_path = tpch_codebase_path / TPCH_SCHEMA_FNAME assert ( - self._schema_fpath.exists() - ), f"self._schema_fpath ({self._schema_fpath}) does not exist" - self._constraints_fpath = schema_root_dpath / TPCH_CONSTRAINTS_FNAME + self._schema_path.exists() + ), f"self._schema_path ({self._schema_path}) does not exist" + self._constraints_path = tpch_codebase_path / TPCH_CONSTRAINTS_FNAME assert ( - self._constraints_fpath.exists() - ), f"self._constraints_fpath ({self._constraints_fpath}) does not exist" - - # tables - data_root_dpath = ( - dbgym_workspace.dbgym_symlinks_path / TpchLoadInfo.CODEBASE_DNAME / "data" - ) - tables_symlink_dpath = ( - data_root_dpath / f"{get_default_tables_dname(scale_factor)}.link" + self._constraints_path.exists() + ), f"self._constraints_path ({self._constraints_path}) does not exist" + + # Tables + tables_path = fully_resolve_path( + get_tables_symlink_path( + dbgym_workspace.dbgym_workspace_path, "tpch", scale_factor + ) ) - tables_dpath = tables_symlink_dpath.resolve() - assert is_fully_resolved( - tables_dpath - ), f"tables_dpath ({tables_dpath}) should be an existent real absolute path. Make sure you have generated the TPC-H data" - self._tables_and_fpaths = [] + self._tables_and_paths = [] for table in TpchLoadInfo.TABLES: - table_fpath = tables_dpath / f"{table}.tbl" - self._tables_and_fpaths.append((table, table_fpath)) + table_path = tables_path / f"{table}.tbl" + self._tables_and_paths.append((table, table_path)) - def get_schema_fpath(self) -> Path: - return self._schema_fpath + def get_schema_path(self) -> Path: + return self._schema_path - def get_tables_and_fpaths(self) -> list[tuple[str, Path]]: - return self._tables_and_fpaths + def get_tables_and_paths(self) -> list[tuple[str, Path]]: + return self._tables_and_paths def get_table_file_delimiter(self) -> str: return "|" - def get_constraints_fpath(self) -> Optional[Path]: - return self._constraints_fpath + def get_constraints_path(self) -> Optional[Path]: + return self._constraints_path diff --git a/dbms/cli.py b/dbms/cli.py index 6d97c5e2..2804b79b 100644 --- a/dbms/cli.py +++ b/dbms/cli.py @@ -7,7 +7,7 @@ @click.group(name="dbms") @click.pass_obj def dbms_group(dbgym_workspace: DBGymWorkspace) -> None: - dbgym_workspace.append_group("dbms") + pass dbms_group.add_command(postgres_group) diff --git a/dbms/load_info_base_class.py b/dbms/load_info_base_class.py index 847579d6..fca7d903 100644 --- a/dbms/load_info_base_class.py +++ b/dbms/load_info_base_class.py @@ -9,10 +9,10 @@ class LoadInfoBaseClass: copy the comments or type annotations or else they might become out of sync. """ - def get_schema_fpath(self) -> Path: + def get_schema_path(self) -> Path: raise NotImplementedError - def get_tables_and_fpaths(self) -> list[tuple[str, Path]]: + def get_tables_and_paths(self) -> list[tuple[str, Path]]: raise NotImplementedError # We assume the table file has a "csv-like" format where values are separated by a delimiter. @@ -21,5 +21,5 @@ def get_table_file_delimiter(self) -> str: # If the subclassing benchmark does not have constraints, you can return None here. # Constraints are also indexes. - def get_constraints_fpath(self) -> Optional[Path]: + def get_constraints_path(self) -> Optional[Path]: raise NotImplementedError diff --git a/dbms/postgres/build_repo.sh b/dbms/postgres/_build_repo.sh similarity index 56% rename from dbms/postgres/build_repo.sh rename to dbms/postgres/_build_repo.sh index 2127c438..d6fb839b 100755 --- a/dbms/postgres/build_repo.sh +++ b/dbms/postgres/_build_repo.sh @@ -2,14 +2,14 @@ set -euxo pipefail -REPO_REAL_PARENT_DPATH="$1" +REPO_REAL_PARENT_PATH="$1" # Download and make postgres from the boot repository. -mkdir -p "${REPO_REAL_PARENT_DPATH}" -cd "${REPO_REAL_PARENT_DPATH}" +mkdir -p "${REPO_REAL_PARENT_PATH}" +cd "${REPO_REAL_PARENT_PATH}" git clone https://github.com/lmwnshn/boot.git --single-branch --branch vldb_2024 --depth 1 cd ./boot -./cmudb/build/configure.sh release "${REPO_REAL_PARENT_DPATH}/boot/build/postgres" +./cmudb/build/configure.sh release "${REPO_REAL_PARENT_PATH}/boot/build/postgres" make clean make install-world-bin -j4 @@ -17,18 +17,18 @@ make install-world-bin -j4 cd ./cmudb/extension/boot_rs/ cargo build --release cbindgen . -o target/boot_rs.h --lang c -cd "${REPO_REAL_PARENT_DPATH}/boot" +cd "${REPO_REAL_PARENT_PATH}/boot" cd ./cmudb/extension/boot/ make clean make install -j -cd "${REPO_REAL_PARENT_DPATH}/boot" +cd "${REPO_REAL_PARENT_PATH}/boot" # Download and make hypopg. git clone https://github.com/HypoPG/hypopg.git cd ./hypopg -PG_CONFIG="${REPO_REAL_PARENT_DPATH}/boot/build/postgres/bin/pg_config" make install -cd "${REPO_REAL_PARENT_DPATH}/boot" +PG_CONFIG="${REPO_REAL_PARENT_PATH}/boot/build/postgres/bin/pg_config" make install +cd "${REPO_REAL_PARENT_PATH}/boot" # Download and make pg_hint_plan. # We need -L to follow links. @@ -36,6 +36,6 @@ curl -L https://github.com/ossc-db/pg_hint_plan/archive/refs/tags/REL15_1_5_1.ta tar -xzf REL15_1_5_1.tar.gz rm REL15_1_5_1.tar.gz cd ./pg_hint_plan-REL15_1_5_1 -PATH="${REPO_REAL_PARENT_DPATH}/boot/build/postgres/bin:$PATH" make -PATH="${REPO_REAL_PARENT_DPATH}/boot/build/postgres/bin:$PATH" make install -cp ./pg_hint_plan.so ${REPO_REAL_PARENT_DPATH}/boot/build/postgres/lib +PATH="${REPO_REAL_PARENT_PATH}/boot/build/postgres/bin:$PATH" make +PATH="${REPO_REAL_PARENT_PATH}/boot/build/postgres/bin:$PATH" make install +cp ./pg_hint_plan.so ${REPO_REAL_PARENT_PATH}/boot/build/postgres/lib diff --git a/dbms/postgres/cli.py b/dbms/postgres/cli.py index 393689a1..28dd1620 100644 --- a/dbms/postgres/cli.py +++ b/dbms/postgres/cli.py @@ -3,7 +3,6 @@ """ import logging -import os import shutil import subprocess from pathlib import Path @@ -11,6 +10,12 @@ import click import sqlalchemy +from gymlib.symlinks_paths import ( + get_dbdata_tgz_symlink_path, + get_pgbin_symlink_path, + get_repo_symlink_path, + linkname_to_name, +) from benchmark.constants import DEFAULT_SCALE_FACTOR from benchmark.job.load_info import JobLoadInfo @@ -33,21 +38,16 @@ WORKSPACE_PATH_PLACEHOLDER, DBGymWorkspace, fully_resolve_path, - get_dbdata_tgz_name, - get_default_dbdata_parent_dpath, - get_default_pgbin_path, + get_tmp_path_from_workspace_path, is_fully_resolved, is_ssd, - link_result, - open_and_save, - save_file, ) @click.group(name="postgres") @click.pass_obj def postgres_group(dbgym_workspace: DBGymWorkspace) -> None: - dbgym_workspace.append_group("postgres") + pass @postgres_group.command( @@ -61,7 +61,38 @@ def postgres_group(dbgym_workspace: DBGymWorkspace) -> None: help="Include this flag to rebuild Postgres even if it already exists.", ) def postgres_build(dbgym_workspace: DBGymWorkspace, rebuild: bool) -> None: - _build_repo(dbgym_workspace, rebuild) + _postgres_build(dbgym_workspace, rebuild) + + +def _postgres_build(dbgym_workspace: DBGymWorkspace, rebuild: bool) -> None: + """ + This function exists as a hook for integration tests. + """ + expected_repo_symlink_path = get_repo_symlink_path( + dbgym_workspace.dbgym_workspace_path + ) + if not rebuild and expected_repo_symlink_path.exists(): + logging.getLogger(DBGYM_LOGGER_NAME).info( + f"Skipping _postgres_build: {expected_repo_symlink_path}" + ) + return + + logging.getLogger(DBGYM_LOGGER_NAME).info( + f"Setting up repo in {expected_repo_symlink_path}" + ) + repo_real_path = dbgym_workspace.dbgym_this_run_path / "repo" + repo_real_path.mkdir(parents=False, exist_ok=False) + subprocess_run( + f"./_build_repo.sh {repo_real_path}", + cwd=dbgym_workspace.base_dbgym_repo_path / "dbms" / "postgres", + ) + + # only link at the end so that the link only ever points to a complete repo + repo_symlink_path = dbgym_workspace.link_result(repo_real_path) + assert expected_repo_symlink_path.samefile(repo_symlink_path) + logging.getLogger(DBGYM_LOGGER_NAME).info( + f"Set up repo in {expected_repo_symlink_path}" + ) @postgres_group.command( @@ -75,19 +106,19 @@ def postgres_build(dbgym_workspace: DBGymWorkspace, rebuild: bool) -> None: "--pgbin-path", type=Path, default=None, - help=f"The path to the bin containing Postgres executables. The default is {get_default_pgbin_path(WORKSPACE_PATH_PLACEHOLDER)}.", + help=f"The path to the bin containing Postgres executables. The default is {get_pgbin_symlink_path(WORKSPACE_PATH_PLACEHOLDER)}.", ) @click.option( "--intended-dbdata-hardware", type=click.Choice(["hdd", "ssd"]), default="hdd", - help=f"The intended hardware dbdata should be on. Used as a sanity check for --dbdata-parent-dpath.", + help=f"The intended hardware dbdata should be on. Used as a sanity check for --dbdata-parent-path.", ) @click.option( - "--dbdata-parent-dpath", + "--dbdata-parent-path", default=None, type=Path, - help=f"The path to the parent directory of the dbdata which will be actively tuned. The default is {get_default_dbdata_parent_dpath(WORKSPACE_PATH_PLACEHOLDER)}.", + help=f"The path to the parent directory of the dbdata which will be actively tuned. The default is {get_tmp_path_from_workspace_path(WORKSPACE_PATH_PLACEHOLDER)}.", ) def postgres_dbdata( dbgym_workspace: DBGymWorkspace, @@ -95,69 +126,58 @@ def postgres_dbdata( scale_factor: float, pgbin_path: Optional[Path], intended_dbdata_hardware: str, - dbdata_parent_dpath: Optional[Path], + dbdata_parent_path: Optional[Path], +) -> None: + _postgres_dbdata( + dbgym_workspace, + benchmark_name, + scale_factor, + pgbin_path, + intended_dbdata_hardware, + dbdata_parent_path, + ) + + +def _postgres_dbdata( + dbgym_workspace: DBGymWorkspace, + benchmark_name: str, + scale_factor: float, + pgbin_path: Optional[Path], + intended_dbdata_hardware: str, + dbdata_parent_path: Optional[Path], ) -> None: + """ + This function exists as a hook for integration tests. + """ # Set args to defaults programmatically (do this before doing anything else in the function) if pgbin_path is None: - pgbin_path = get_default_pgbin_path(dbgym_workspace.dbgym_workspace_path) - if dbdata_parent_dpath is None: - dbdata_parent_dpath = get_default_dbdata_parent_dpath( + pgbin_path = get_pgbin_symlink_path(dbgym_workspace.dbgym_workspace_path) + if dbdata_parent_path is None: + dbdata_parent_path = get_tmp_path_from_workspace_path( dbgym_workspace.dbgym_workspace_path ) # Fully resolve all input paths. pgbin_path = fully_resolve_path(pgbin_path) - dbdata_parent_dpath = fully_resolve_path(dbdata_parent_dpath) + dbdata_parent_path = fully_resolve_path(dbdata_parent_path) # Check assertions on args if intended_dbdata_hardware == "hdd": assert not is_ssd( - dbdata_parent_dpath - ), f"Intended hardware is HDD but dbdata_parent_dpath ({dbdata_parent_dpath}) is an SSD" + dbdata_parent_path + ), f"Intended hardware is HDD but dbdata_parent_path ({dbdata_parent_path}) is an SSD" elif intended_dbdata_hardware == "ssd": assert is_ssd( - dbdata_parent_dpath - ), f"Intended hardware is SSD but dbdata_parent_dpath ({dbdata_parent_dpath}) is an HDD" + dbdata_parent_path + ), f"Intended hardware is SSD but dbdata_parent_path ({dbdata_parent_path}) is an HDD" else: - assert False + assert ( + False + ), f'Intended hardware is "{intended_dbdata_hardware}" which is invalid' # Create dbdata _create_dbdata( - dbgym_workspace, benchmark_name, scale_factor, pgbin_path, dbdata_parent_dpath - ) - - -def _get_pgbin_symlink_path(dbgym_workspace: DBGymWorkspace) -> Path: - return dbgym_workspace.cur_symlinks_build_path( - "repo.link", "boot", "build", "postgres", "bin" - ) - - -def _get_repo_symlink_path(dbgym_workspace: DBGymWorkspace) -> Path: - return dbgym_workspace.cur_symlinks_build_path("repo.link") - - -def _build_repo(dbgym_workspace: DBGymWorkspace, rebuild: bool) -> None: - expected_repo_symlink_dpath = _get_repo_symlink_path(dbgym_workspace) - if not rebuild and expected_repo_symlink_dpath.exists(): - logging.getLogger(DBGYM_LOGGER_NAME).info( - f"Skipping _build_repo: {expected_repo_symlink_dpath}" - ) - return - - logging.getLogger(DBGYM_LOGGER_NAME).info( - f"Setting up repo in {expected_repo_symlink_dpath}" - ) - repo_real_dpath = dbgym_workspace.cur_task_runs_build_path("repo", mkdir=True) - subprocess_run( - f"./build_repo.sh {repo_real_dpath}", cwd=dbgym_workspace.cur_source_path() - ) - - # only link at the end so that the link only ever points to a complete repo - repo_symlink_dpath = link_result(dbgym_workspace, repo_real_dpath) - assert expected_repo_symlink_dpath.samefile(repo_symlink_dpath) - logging.getLogger(DBGYM_LOGGER_NAME).info( - f"Set up repo in {expected_repo_symlink_dpath}" + dbgym_workspace, benchmark_name, scale_factor, pgbin_path, dbdata_parent_path ) @@ -166,49 +186,56 @@ def _create_dbdata( benchmark_name: str, scale_factor: float, pgbin_path: Path, - dbdata_parent_dpath: Path, + dbdata_parent_path: Path, ) -> None: """ - I chose *not* for this function to skip by default if dbdata_tgz_symlink_path already exists. This - is because, while the generated data is deterministic given benchmark_name and scale_factor, any - change in the _create_dbdata() function would result in a different dbdata. Since _create_dbdata() - may change somewhat frequently, I decided to get rid of the footgun of having changes to - _create_dbdata() not propagate to [dbdata].tgz by default. + If you change the code of _create_dbdata(), you should also delete the symlink so that the next time you run + `dbms postgres dbdata` it will re-create the dbdata. """ + expected_dbdata_tgz_symlink_path = get_dbdata_tgz_symlink_path( + dbgym_workspace.dbgym_workspace_path, + benchmark_name, + scale_factor, + ) + if expected_dbdata_tgz_symlink_path.exists(): + logging.getLogger(DBGYM_LOGGER_NAME).info( + f"Skipping _create_dbdata: {expected_dbdata_tgz_symlink_path}" + ) + return # It's ok for the dbdata/ directory to be temporary. It just matters that the .tgz is saved in a safe place. - dbdata_dpath = dbdata_parent_dpath / "dbdata_being_created" - # We might be reusing the same dbdata_parent_dpath, so delete dbdata_dpath if it already exists - if dbdata_dpath.exists(): - shutil.rmtree(dbdata_dpath) + dbdata_path = dbdata_parent_path / "dbdata_being_created" + # We might be reusing the same dbdata_parent_path, so delete dbdata_path if it already exists + if dbdata_path.exists(): + shutil.rmtree(dbdata_path) # Call initdb. - # Save any script we call from pgbin_symlink_dpath because they are dependencies generated from another task run. - save_file(dbgym_workspace, pgbin_path / "initdb") - subprocess_run(f'./initdb -D "{dbdata_dpath}"', cwd=pgbin_path) + # Save any script we call from pgbin_symlink_path because they are dependencies generated from another task run. + dbgym_workspace.save_file(pgbin_path / "initdb") + subprocess_run(f'./initdb -D "{dbdata_path}"', cwd=pgbin_path) # Start Postgres (all other dbdata setup requires postgres to be started). # Note that subprocess_run() never returns when running "pg_ctl start", so I'm using subprocess.run() instead. - start_postgres(dbgym_workspace, pgbin_path, dbdata_dpath) + start_postgres(dbgym_workspace, pgbin_path, dbdata_path) # Set up Postgres. _generic_dbdata_setup(dbgym_workspace) _load_benchmark_into_dbdata(dbgym_workspace, benchmark_name, scale_factor) # Stop Postgres so that we don't "leak" processes. - stop_postgres(dbgym_workspace, pgbin_path, dbdata_dpath) + stop_postgres(dbgym_workspace, pgbin_path, dbdata_path) # Create .tgz file. - # Note that you can't pass "[dbdata].tgz" as an arg to cur_task_runs_data_path() because that would create "[dbdata].tgz" as a dir. - dbdata_tgz_real_fpath = dbgym_workspace.cur_task_runs_data_path( - mkdir=True - ) / get_dbdata_tgz_name(benchmark_name, scale_factor) - # We need to cd into dbdata_dpath so that the tar file does not contain folders for the whole path of dbdata_dpath. - subprocess_run(f"tar -czf {dbdata_tgz_real_fpath} .", cwd=dbdata_dpath) + dbdata_tgz_real_path = dbgym_workspace.dbgym_this_run_path / linkname_to_name( + expected_dbdata_tgz_symlink_path.name + ) + # We need to cd into dbdata_path so that the tar file does not contain folders for the whole path of dbdata_path. + subprocess_run(f"tar -czf {dbdata_tgz_real_path} .", cwd=dbdata_path) # Create symlink. # Only link at the end so that the link only ever points to a complete dbdata. - dbdata_tgz_symlink_path = link_result(dbgym_workspace, dbdata_tgz_real_fpath) + dbdata_tgz_symlink_path = dbgym_workspace.link_result(dbdata_tgz_real_path) + assert expected_dbdata_tgz_symlink_path.samefile(dbdata_tgz_symlink_path) logging.getLogger(DBGYM_LOGGER_NAME).info( f"Created dbdata in {dbdata_tgz_symlink_path}" ) @@ -216,21 +243,23 @@ def _create_dbdata( def _generic_dbdata_setup(dbgym_workspace: DBGymWorkspace) -> None: # get necessary vars - pgbin_real_dpath = _get_pgbin_symlink_path(dbgym_workspace).resolve() - assert pgbin_real_dpath.exists() + pgbin_real_path = get_pgbin_symlink_path( + dbgym_workspace.dbgym_workspace_path + ).resolve() + assert pgbin_real_path.exists() dbgym_pguser = DBGYM_POSTGRES_USER dbgym_pgpass = DBGYM_POSTGRES_PASS pgport = DEFAULT_POSTGRES_PORT # Create user - save_file(dbgym_workspace, pgbin_real_dpath / "psql") + dbgym_workspace.save_file(pgbin_real_path / "psql") subprocess_run( f"./psql -c \"create user {dbgym_pguser} with superuser password '{dbgym_pgpass}'\" {DEFAULT_POSTGRES_DBNAME} -p {pgport} -h localhost", - cwd=pgbin_real_dpath, + cwd=pgbin_real_path, ) subprocess_run( f'./psql -c "grant pg_monitor to {dbgym_pguser}" {DEFAULT_POSTGRES_DBNAME} -p {pgport} -h localhost', - cwd=pgbin_real_dpath, + cwd=pgbin_real_path, ) # Load shared preload libraries @@ -239,14 +268,14 @@ def _generic_dbdata_setup(dbgym_workspace: DBGymWorkspace) -> None: # You have to use TO and you can't put single quotes around the libraries (https://postgrespro.com/list/thread-id/2580120) # The method I wrote here works for both one library and multiple libraries f'./psql -c "ALTER SYSTEM SET shared_preload_libraries TO {SHARED_PRELOAD_LIBRARIES};" {DEFAULT_POSTGRES_DBNAME} -p {pgport} -h localhost', - cwd=pgbin_real_dpath, + cwd=pgbin_real_path, ) # Create the dbgym database. Since one dbdata dir maps to one benchmark, all benchmarks will use the same database # as opposed to using databases named after the benchmark. subprocess_run( f"./psql -c \"create database {DBGYM_POSTGRES_DBNAME} with owner = '{dbgym_pguser}'\" {DEFAULT_POSTGRES_DBNAME} -p {pgport} -h localhost", - cwd=pgbin_real_dpath, + cwd=pgbin_real_path, ) @@ -273,14 +302,14 @@ def _load_into_dbdata( conn: sqlalchemy.Connection, load_info: LoadInfoBaseClass, ) -> None: - sql_file_execute(dbgym_workspace, conn, load_info.get_schema_fpath()) + sql_file_execute(dbgym_workspace, conn, load_info.get_schema_path()) # Truncate all tables first before even loading a single one. - for table, _ in load_info.get_tables_and_fpaths(): + for table, _ in load_info.get_tables_and_paths(): sqlalchemy_conn_execute(conn, f"TRUNCATE {table} CASCADE") # Then, load the tables. - for table, table_fpath in load_info.get_tables_and_fpaths(): - with open_and_save(dbgym_workspace, table_fpath, "r") as table_csv: + for table, table_path in load_info.get_tables_and_paths(): + with dbgym_workspace.open_and_save(table_path, "r") as table_csv: assert conn.connection.dbapi_connection is not None cur = conn.connection.dbapi_connection.cursor() try: @@ -292,9 +321,9 @@ def _load_into_dbdata( finally: cur.close() - constraints_fpath = load_info.get_constraints_fpath() - if constraints_fpath is not None: - sql_file_execute(dbgym_workspace, conn, constraints_fpath) + constraints_path = load_info.get_constraints_path() + if constraints_path is not None: + sql_file_execute(dbgym_workspace, conn, constraints_path) # The start and stop functions slightly duplicate functionality from pg_conn.py. However, I chose to do it this way @@ -303,41 +332,41 @@ def _load_into_dbdata( # even though they are a little redundant. It seems better than making `dbms` depend on the behavior of the # tuning environment. def start_postgres( - dbgym_workspace: DBGymWorkspace, pgbin_path: Path, dbdata_dpath: Path + dbgym_workspace: DBGymWorkspace, pgbin_path: Path, dbdata_path: Path ) -> None: - _start_or_stop_postgres(dbgym_workspace, pgbin_path, dbdata_dpath, True) + _start_or_stop_postgres(dbgym_workspace, pgbin_path, dbdata_path, True) def stop_postgres( - dbgym_workspace: DBGymWorkspace, pgbin_path: Path, dbdata_dpath: Path + dbgym_workspace: DBGymWorkspace, pgbin_path: Path, dbdata_path: Path ) -> None: - _start_or_stop_postgres(dbgym_workspace, pgbin_path, dbdata_dpath, False) + _start_or_stop_postgres(dbgym_workspace, pgbin_path, dbdata_path, False) def _start_or_stop_postgres( dbgym_workspace: DBGymWorkspace, pgbin_path: Path, - dbdata_dpath: Path, + dbdata_path: Path, is_start: bool, ) -> None: # They should be absolute paths and should exist assert is_fully_resolved(pgbin_path) - assert is_fully_resolved(dbdata_dpath) + assert is_fully_resolved(dbdata_path) pgport = DEFAULT_POSTGRES_PORT - save_file(dbgym_workspace, pgbin_path / "pg_ctl") + dbgym_workspace.save_file(pgbin_path / "pg_ctl") if is_start: # We use subprocess.run() because subprocess_run() never returns when running "pg_ctl start". # The reason subprocess_run() never returns is because pg_ctl spawns a postgres process so .poll() always returns None. # On the other hand, subprocess.run() does return normally, like calling `./pg_ctl` on the command line would do. result = subprocess.run( - f"./pg_ctl -D \"{dbdata_dpath}\" -o '-p {pgport}' start", + f"./pg_ctl -D \"{dbdata_path}\" -o '-p {pgport}' start", cwd=pgbin_path, shell=True, ) result.check_returncode() else: subprocess_run( - f"./pg_ctl -D \"{dbdata_dpath}\" -o '-p {pgport}' stop", + f"./pg_ctl -D \"{dbdata_path}\" -o '-p {pgport}' stop", cwd=pgbin_path, ) diff --git a/dbms/tests/__init__.py b/dbms/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dbms/tests/dbms_integtest_dbgym_config.yaml b/dbms/tests/dbms_integtest_dbgym_config.yaml new file mode 100644 index 00000000..e8e37294 --- /dev/null +++ b/dbms/tests/dbms_integtest_dbgym_config.yaml @@ -0,0 +1 @@ +dbgym_workspace_path: ../dbgym_dbms_integtest_workspace/ diff --git a/dbms/tests/integtest_dbms.py b/dbms/tests/integtest_dbms.py new file mode 100644 index 00000000..822e8aac --- /dev/null +++ b/dbms/tests/integtest_dbms.py @@ -0,0 +1,67 @@ +import os +import shutil +import unittest +from pathlib import Path + +from gymlib.symlinks_paths import get_dbdata_tgz_symlink_path, get_repo_symlink_path + +from benchmark.tpch.cli import _tpch_tables +from dbms.postgres.cli import _postgres_build, _postgres_dbdata +from util.workspace import ( + DBGymWorkspace, + fully_resolve_path, + get_workspace_path_from_config, +) + + +class DBMSTests(unittest.TestCase): + DBGYM_CONFIG_PATH = Path("dbms/tests/dbms_integtest_dbgym_config.yaml") + + def setUp(self) -> None: + workspace_path = get_workspace_path_from_config(DBMSTests.DBGYM_CONFIG_PATH) + # Get a clean start each time. + if workspace_path.exists(): + shutil.rmtree(workspace_path) + + # Reset this to avoid the error of it being created twice. + # In real usage, the second run would be a different Python process so DBGymWorkspace._num_times_created_this_run would be 0. + DBGymWorkspace._num_times_created_this_run = 0 + self.workspace = DBGymWorkspace(workspace_path) + + def tearDown(self) -> None: + if self.workspace.dbgym_workspace_path.exists(): + shutil.rmtree(self.workspace.dbgym_workspace_path) + + def test_postgres_build(self) -> None: + repo_path = get_repo_symlink_path(self.workspace.dbgym_workspace_path) + self.assertFalse(repo_path.exists()) + _postgres_build(self.workspace, False) + self.assertTrue(repo_path.exists()) + self.assertTrue(fully_resolve_path(repo_path).exists()) + + def test_postgres_dbdata(self) -> None: + # Setup + # Make sure to recreate self.workspace so that each function call counts as its own run. + scale_factor = 0.01 + _postgres_build(self.workspace, False) + DBGymWorkspace._num_times_created_this_run = 0 + self.workspace = DBGymWorkspace(self.workspace.dbgym_workspace_path) + _tpch_tables(self.workspace, scale_factor) + DBGymWorkspace._num_times_created_this_run = 0 + self.workspace = DBGymWorkspace(self.workspace.dbgym_workspace_path) + + # Test + dbdata_tgz_path = get_dbdata_tgz_symlink_path( + self.workspace.dbgym_workspace_path, "tpch", scale_factor + ) + self.assertFalse(dbdata_tgz_path.exists()) + intended_dbdata_hardware = os.environ.get("INTENDED_DBDATA_HARDWARE", "hdd") + _postgres_dbdata( + self.workspace, "tpch", scale_factor, None, intended_dbdata_hardware, None + ) + self.assertTrue(dbdata_tgz_path.exists()) + self.assertTrue(fully_resolve_path(dbdata_tgz_path).exists()) + + +if __name__ == "__main__": + unittest.main() diff --git a/env/pg_conn.py b/env/pg_conn.py index 17b36499..7aa4e5d8 100644 --- a/env/pg_conn.py +++ b/env/pg_conn.py @@ -24,7 +24,7 @@ from util.log import DBGYM_LOGGER_NAME from util.pg import DBGYM_POSTGRES_DBNAME, SHARED_PRELOAD_LIBRARIES, get_kv_connstr -from util.workspace import DBGymWorkspace, open_and_save, parent_dpath_of_path +from util.workspace import DBGymWorkspace, parent_path_of_path CONNECT_TIMEOUT = 300 @@ -37,36 +37,36 @@ def __init__( self, dbgym_workspace: DBGymWorkspace, pgport: int, - pristine_dbdata_snapshot_fpath: Path, - dbdata_parent_dpath: Path, + pristine_dbdata_snapshot_path: Path, + dbdata_parent_path: Path, pgbin_path: Union[str, Path], # Whether this is None determines whether Boot is enabled. - boot_config_fpath: Optional[Path], + boot_config_path: Optional[Path], ) -> None: self.dbgym_workspace = dbgym_workspace self.pgport = pgport self.pgbin_path = pgbin_path - self.boot_config_fpath = boot_config_fpath + self.boot_config_path = boot_config_path self.log_step = 0 # All the paths related to dbdata - # pristine_dbdata_snapshot_fpath is the .tgz snapshot that represents the starting state + # pristine_dbdata_snapshot_path is the .tgz snapshot that represents the starting state # of the database (with the default configuration). It is generated by a call to # `python tune.py dbms postgres ...` and should not be overwritten. - self.pristine_dbdata_snapshot_fpath = pristine_dbdata_snapshot_fpath - # checkpoint_dbdata_snapshot_fpath is the .tgz snapshot that represents the current + self.pristine_dbdata_snapshot_path = pristine_dbdata_snapshot_path + # checkpoint_dbdata_snapshot_path is the .tgz snapshot that represents the current # state of the database as it is being tuned. It is generated while tuning and is # discarded once tuning is completed. - self.checkpoint_dbdata_snapshot_fpath = ( + self.checkpoint_dbdata_snapshot_path = ( dbgym_workspace.dbgym_tmp_path / "checkpoint_dbdata.tgz" ) - # dbdata_parent_dpath is the parent directory of the dbdata that is *actively being tuned*. - # It is *not* the parent directory of pristine_dbdata_snapshot_fpath. + # dbdata_parent_path is the parent directory of the dbdata that is *actively being tuned*. + # It is *not* the parent directory of pristine_dbdata_snapshot_path. # Setting this lets us control the hardware device dbdata is built on (e.g. HDD vs. SSD). - self.dbdata_parent_dpath = dbdata_parent_dpath - # dbdata_dpath is the dbdata that is *actively being tuned* - self.dbdata_dpath = self.dbdata_parent_dpath / f"dbdata{self.pgport}" + self.dbdata_parent_path = dbdata_parent_path + # dbdata_path is the dbdata that is *actively being tuned* + self.dbdata_path = self.dbdata_parent_path / f"dbdata{self.pgport}" self._conn: Optional[psycopg.Connection[Any]] = None self.hint_check_failed_with: Optional[str] = None @@ -102,16 +102,13 @@ def disconnect(self) -> None: self._conn = None def move_log(self) -> None: - pglog_fpath = ( - self.dbgym_workspace.cur_task_runs_artifacts_path(mkdir=True) - / f"pg{self.pgport}.log" - ) - pglog_this_step_fpath = ( - self.dbgym_workspace.cur_task_runs_artifacts_path(mkdir=True) + pglog_path = self.dbgym_workspace.dbgym_this_run_path / f"pg{self.pgport}.log" + pglog_this_step_path = ( + self.dbgym_workspace.dbgym_this_run_path / f"pg{self.pgport}.log.{self.log_step}" ) - if pglog_fpath.exists(): - shutil.move(pglog_fpath, pglog_this_step_fpath) + if pglog_path.exists(): + shutil.move(pglog_path, pglog_this_step_path) self.log_step += 1 def force_statement_timeout(self, timeout: float) -> None: @@ -202,13 +199,13 @@ def time_query( def shutdown_postgres(self) -> None: """Shuts down postgres.""" self.disconnect() - if not Path(self.dbdata_dpath).exists(): + if not Path(self.dbdata_path).exists(): return while True: logging.getLogger(DBGYM_LOGGER_NAME).debug("Shutting down postgres...") _, stdout, stderr = local[f"{self.pgbin_path}/pg_ctl"][ - "stop", "--wait", "-t", "180", "-D", self.dbdata_dpath + "stop", "--wait", "-t", "180", "-D", self.dbdata_path ].run(retcode=None) time.sleep(1) logging.getLogger(DBGYM_LOGGER_NAME).debug( @@ -225,7 +222,7 @@ def shutdown_postgres(self) -> None: DBGYM_POSTGRES_DBNAME, ].run(retcode=None) - exists = (Path(self.dbdata_dpath) / "postmaster.pid").exists() + exists = (Path(self.dbdata_path) / "postmaster.pid").exists() if not exists and retcode != 0: break @@ -241,7 +238,7 @@ def restart_with_changes( ) -> bool: """ This function is called "(re)start" because it also shuts down Postgres before starting it. - This function assumes that some snapshot has already been untarred into self.dbdata_dpath. + This function assumes that some snapshot has already been untarred into self.dbdata_path. You can do this by calling one of the wrappers around _restore_snapshot(). Note that multiple calls are not "additive". Calling this will restart from the latest saved @@ -250,7 +247,7 @@ def restart_with_changes( """ # Install the new configuration changes. if conf_changes is not None: - dbdata_auto_conf_path = self.dbdata_dpath / "postgresql.auto.conf" + dbdata_auto_conf_path = self.dbdata_path / "postgresql.auto.conf" with open(dbdata_auto_conf_path, "w") as f: f.write( "\n".join([f"{knob} = {val}" for knob, val in conf_changes.items()]) @@ -273,14 +270,14 @@ def restart_with_changes( "cf", # We append .tmp so that if we fail in the *middle* of running tar, we # still have the previous checkpoint available to us - f"{self.checkpoint_dbdata_snapshot_fpath}.tmp", + f"{self.checkpoint_dbdata_snapshot_path}.tmp", "-C", - parent_dpath_of_path(self.dbdata_dpath), - self.dbdata_dpath, + parent_path_of_path(self.dbdata_path), + self.dbdata_path, ].run() # Make sure the PID lock file doesn't exist. - pid_lock = Path(f"{self.dbdata_dpath}/postmaster.pid") + pid_lock = Path(f"{self.dbdata_path}/postmaster.pid") assert not pid_lock.exists() if dump_page_cache: @@ -292,15 +289,14 @@ def restart_with_changes( # Try starting up. retcode, stdout, stderr = local[f"{self.pgbin_path}/pg_ctl"][ "-D", - self.dbdata_dpath, + self.dbdata_path, "--wait", "-t", "180", "-l", # We log to pg{self.pgport}.log instead of pg.log so that different PostgresConn objects # don't all try to write to the same file. - self.dbgym_workspace.cur_task_runs_artifacts_path(mkdir=True) - / f"pg{self.pgport}.log", + self.dbgym_workspace.dbgym_this_run_path / f"pg{self.pgport}.log", "start", ].run(retcode=None) @@ -345,8 +341,8 @@ def restart_with_changes( ) # Set up Boot if we're told to do so - if self.boot_config_fpath is not None: - with open_and_save(self.dbgym_workspace, self.boot_config_fpath) as f: + if self.boot_config_path is not None: + with self.dbgym_workspace.open_and_save(self.boot_config_path) as f: boot_config = yaml.safe_load(f) self._set_up_boot( @@ -362,7 +358,7 @@ def restart_with_changes( # Move the temporary over since we now know the temporary can load. if save_checkpoint: - shutil.move(f"{self.dbdata_dpath}.tgz.tmp", f"{self.dbdata_dpath}.tgz") + shutil.move(f"{self.dbdata_path}.tgz.tmp", f"{self.dbdata_path}.tgz") return True @@ -483,10 +479,10 @@ def get_system_knobs(self) -> dict[str, str]: return knobs def restore_pristine_snapshot(self) -> bool: - return self._restore_snapshot(self.pristine_dbdata_snapshot_fpath) + return self._restore_snapshot(self.pristine_dbdata_snapshot_path) def restore_checkpointed_snapshot(self) -> bool: - return self._restore_snapshot(self.checkpoint_dbdata_snapshot_fpath) + return self._restore_snapshot(self.checkpoint_dbdata_snapshot_path) def _restore_snapshot( self, @@ -494,23 +490,23 @@ def _restore_snapshot( ) -> bool: self.shutdown_postgres() - local["rm"]["-rf", self.dbdata_dpath].run() - local["mkdir"]["-m", "0700", "-p", self.dbdata_dpath].run() + local["rm"]["-rf", self.dbdata_path].run() + local["mkdir"]["-m", "0700", "-p", self.dbdata_path].run() - # Strip the "dbdata" so we can implant directly into the target dbdata_dpath. + # Strip the "dbdata" so we can implant directly into the target dbdata_path. assert dbdata_snapshot_path.exists() local["tar"][ "xf", dbdata_snapshot_path, "-C", - self.dbdata_dpath, + self.dbdata_path, "--strip-components", "1", ].run() # Imprint the required port. ( (local["echo"][f"port={self.pgport}"]) - >> f"{self.dbdata_dpath}/postgresql.conf" + >> f"{self.dbdata_path}/postgresql.conf" )() return self.restart_postgres() diff --git a/env/replay.py b/env/replay.py index 8dcfe8e4..55621284 100644 --- a/env/replay.py +++ b/env/replay.py @@ -9,7 +9,7 @@ def replay( - dbgym_workspace: DBGymWorkspace, tuning_artifacts_dpath: Path + dbgym_workspace: DBGymWorkspace, tuning_artifacts_path: Path ) -> list[tuple[float, int]]: """ Returns the total runtime and the number of timed out queries for each step. @@ -18,7 +18,7 @@ def replay( """ replay_data: list[tuple[float, int]] = [] - reader = TuningArtifactsReader(tuning_artifacts_dpath) + reader = TuningArtifactsReader(tuning_artifacts_path) pg_conn = PostgresConn( dbgym_workspace, DEFAULT_POSTGRES_PORT, diff --git a/env/tests/_set_up_gymlib_integtest_workspace.sh b/env/tests/_set_up_gymlib_integtest_workspace.sh index bd2130a7..b1538a70 100755 --- a/env/tests/_set_up_gymlib_integtest_workspace.sh +++ b/env/tests/_set_up_gymlib_integtest_workspace.sh @@ -11,12 +11,12 @@ # the Postgres repo is very large and (b) the built binary will be different for different machines. # This script should be run from the base dbgym/ directory. -set -euo pipefail +set -euxo pipefail # INTENDED_DBDATA_HARDWARE can be set elsewhere (e.g. by tests_ci.yaml) but we use hdd by default. INTENDED_DBDATA_HARDWARE="${INTENDED_DBDATA_HARDWARE:-hdd}" -python3 task.py benchmark $BENCHMARK data $SCALE_FACTOR +python3 task.py benchmark $BENCHMARK tables $SCALE_FACTOR python3 task.py benchmark $BENCHMARK workload --scale-factor $SCALE_FACTOR python3 task.py dbms postgres build diff --git a/env/tests/gymlib_integtest_util.py b/env/tests/gymlib_integtest_util.py index 86b3b829..44ec3c4c 100644 --- a/env/tests/gymlib_integtest_util.py +++ b/env/tests/gymlib_integtest_util.py @@ -3,16 +3,20 @@ from pathlib import Path from typing import Optional +# TODO: remove symlinks_paths from the import +from gymlib.symlinks_paths import ( + get_dbdata_tgz_symlink_path, + get_pgbin_symlink_path, + get_workload_suffix, + get_workload_symlink_path, +) + +from benchmark.tpch.constants import DEFAULT_TPCH_SEED from env.tuning_artifacts import TuningMetadata from util.workspace import ( DBGymWorkspace, fully_resolve_path, - get_default_dbdata_parent_dpath, - get_default_pgbin_path, - get_default_pristine_dbdata_snapshot_path, - get_default_workload_name_suffix, - get_default_workload_path, - get_workload_name, + get_tmp_path_from_workspace_path, get_workspace_path_from_config, ) @@ -31,18 +35,20 @@ class GymlibIntegtestManager: BENCHMARK = "tpch" SCALE_FACTOR = 0.01 DBGYM_CONFIG_PATH = Path("env/tests/gymlib_integtest_dbgym_config.yaml") - - # This is set at most once by set_up_workspace(). - DBGYM_WORKSPACE: Optional[DBGymWorkspace] = None + WORKSPACE_PATH: Optional[Path] = None @staticmethod def set_up_workspace() -> None: - workspace_path = get_workspace_path_from_config( + """ + Set up the workspace if it has not already been set up. + None of the integtest_*.py files will delete the workspace so that future tests run faster. + """ + GymlibIntegtestManager.WORKSPACE_PATH = get_workspace_path_from_config( GymlibIntegtestManager.DBGYM_CONFIG_PATH ) # This if statement prevents us from setting up the workspace twice, which saves time. - if not workspace_path.exists(): + if not GymlibIntegtestManager.WORKSPACE_PATH.exists(): subprocess.run( ["./env/tests/_set_up_gymlib_integtest_workspace.sh"], env={ @@ -56,45 +62,42 @@ def set_up_workspace() -> None: check=True, ) - # Once we get here, we have an invariant that the workspace exists. We need this - # invariant to be true in order to create the DBGymWorkspace. - # - # However, it also can't be created more than once so we need to check `is None`. - if GymlibIntegtestManager.DBGYM_WORKSPACE is None: - GymlibIntegtestManager.DBGYM_WORKSPACE = DBGymWorkspace(workspace_path) - @staticmethod - def get_dbgym_workspace() -> DBGymWorkspace: - assert GymlibIntegtestManager.DBGYM_WORKSPACE is not None - return GymlibIntegtestManager.DBGYM_WORKSPACE + def get_workspace_path() -> Path: + assert GymlibIntegtestManager.WORKSPACE_PATH is not None + return GymlibIntegtestManager.WORKSPACE_PATH @staticmethod def get_default_metadata() -> TuningMetadata: - dbgym_workspace = GymlibIntegtestManager.get_dbgym_workspace() + assert GymlibIntegtestManager.BENCHMARK == "tpch" + suffix = get_workload_suffix( + GymlibIntegtestManager.BENCHMARK, + seed_start=DEFAULT_TPCH_SEED, + seed_end=DEFAULT_TPCH_SEED, + query_subset="all", + ) return TuningMetadata( workload_path=fully_resolve_path( - get_default_workload_path( - dbgym_workspace.dbgym_workspace_path, + get_workload_symlink_path( + GymlibIntegtestManager.get_workspace_path(), GymlibIntegtestManager.BENCHMARK, - get_workload_name( - GymlibIntegtestManager.SCALE_FACTOR, - get_default_workload_name_suffix( - GymlibIntegtestManager.BENCHMARK - ), - ), + GymlibIntegtestManager.SCALE_FACTOR, + suffix, ), ), pristine_dbdata_snapshot_path=fully_resolve_path( - get_default_pristine_dbdata_snapshot_path( - dbgym_workspace.dbgym_workspace_path, + get_dbdata_tgz_symlink_path( + GymlibIntegtestManager.get_workspace_path(), GymlibIntegtestManager.BENCHMARK, GymlibIntegtestManager.SCALE_FACTOR, ), ), dbdata_parent_path=fully_resolve_path( - get_default_dbdata_parent_dpath(dbgym_workspace.dbgym_workspace_path), + get_tmp_path_from_workspace_path( + GymlibIntegtestManager.get_workspace_path() + ), ), pgbin_path=fully_resolve_path( - get_default_pgbin_path(dbgym_workspace.dbgym_workspace_path), + get_pgbin_symlink_path(GymlibIntegtestManager.get_workspace_path()), ), ) diff --git a/env/tests/integtest_pg_conn.py b/env/tests/integtest_pg_conn.py index a5e11f4d..da586398 100644 --- a/env/tests/integtest_pg_conn.py +++ b/env/tests/integtest_pg_conn.py @@ -10,12 +10,20 @@ get_is_postgres_running, get_running_postgres_ports, ) +from util.workspace import DBGymWorkspace class PostgresConnTests(unittest.TestCase): + workspace: DBGymWorkspace + @staticmethod def setUpClass() -> None: GymlibIntegtestManager.set_up_workspace() + # Reset _num_times_created_this_run since previous tests may have created a workspace. + DBGymWorkspace._num_times_created_this_run = 0 + PostgresConnTests.workspace = DBGymWorkspace( + GymlibIntegtestManager.get_workspace_path() + ) def setUp(self) -> None: self.assertFalse( @@ -38,7 +46,7 @@ def tearDown(self) -> None: def create_pg_conn(self, pgport: int = DEFAULT_POSTGRES_PORT) -> PostgresConn: return PostgresConn( - GymlibIntegtestManager.get_dbgym_workspace(), + PostgresConnTests.workspace, pgport, self.metadata.pristine_dbdata_snapshot_path, self.metadata.dbdata_parent_path, diff --git a/env/tests/integtest_replay.py b/env/tests/integtest_replay.py index 9523fac7..1752e192 100644 --- a/env/tests/integtest_replay.py +++ b/env/tests/integtest_replay.py @@ -10,16 +10,24 @@ SysKnobsDelta, TuningArtifactsWriter, ) +from util.workspace import DBGymWorkspace class ReplayTests(unittest.TestCase): + workspace: DBGymWorkspace + @staticmethod def setUpClass() -> None: GymlibIntegtestManager.set_up_workspace() + # Reset _num_times_created_this_run since previous tests may have created a workspace. + DBGymWorkspace._num_times_created_this_run = 0 + ReplayTests.workspace = DBGymWorkspace( + GymlibIntegtestManager.get_workspace_path() + ) def test_replay(self) -> None: writer = TuningArtifactsWriter( - GymlibIntegtestManager.get_dbgym_workspace(), + ReplayTests.workspace, GymlibIntegtestManager.get_default_metadata(), ) writer.write_step( @@ -41,8 +49,8 @@ def test_replay(self) -> None: ) ) replay_data = replay( - GymlibIntegtestManager.get_dbgym_workspace(), - writer.tuning_artifacts_dpath, + ReplayTests.workspace, + writer.tuning_artifacts_path, ) # We do some very simple sanity checks here due to the inherent randomness of executing a workload. diff --git a/env/tests/integtest_tuning_artifacts.py b/env/tests/integtest_tuning_artifacts.py index dfdc77c0..1baa1fdb 100644 --- a/env/tests/integtest_tuning_artifacts.py +++ b/env/tests/integtest_tuning_artifacts.py @@ -9,6 +9,7 @@ TuningArtifactsReader, TuningArtifactsWriter, ) +from util.workspace import DBGymWorkspace class PostgresConnTests(unittest.TestCase): @@ -16,6 +17,11 @@ class PostgresConnTests(unittest.TestCase): def setUpClass() -> None: GymlibIntegtestManager.set_up_workspace() + def setUp(self) -> None: + # We re-create a workspace for each test because each test will create its own TuningArtifactsWriter. + DBGymWorkspace._num_times_created_this_run = 0 + self.workspace = DBGymWorkspace(GymlibIntegtestManager.get_workspace_path()) + @staticmethod def make_config(letter: str) -> DBMSConfigDelta: return DBMSConfigDelta( @@ -26,7 +32,7 @@ def make_config(letter: str) -> DBMSConfigDelta: def test_get_delta_at_step(self) -> None: writer = TuningArtifactsWriter( - GymlibIntegtestManager.get_dbgym_workspace(), + self.workspace, GymlibIntegtestManager.get_default_metadata(), ) @@ -34,7 +40,7 @@ def test_get_delta_at_step(self) -> None: writer.write_step(PostgresConnTests.make_config("b")) writer.write_step(PostgresConnTests.make_config("c")) - reader = TuningArtifactsReader(writer.tuning_artifacts_dpath) + reader = TuningArtifactsReader(writer.tuning_artifacts_path) self.assertEqual( reader.get_delta_at_step(1), PostgresConnTests.make_config("b") @@ -51,7 +57,7 @@ def test_get_delta_at_step(self) -> None: def test_get_all_deltas_in_order(self) -> None: writer = TuningArtifactsWriter( - GymlibIntegtestManager.get_dbgym_workspace(), + self.workspace, GymlibIntegtestManager.get_default_metadata(), ) @@ -59,7 +65,7 @@ def test_get_all_deltas_in_order(self) -> None: writer.write_step(PostgresConnTests.make_config("b")) writer.write_step(PostgresConnTests.make_config("c")) - reader = TuningArtifactsReader(writer.tuning_artifacts_dpath) + reader = TuningArtifactsReader(writer.tuning_artifacts_path) self.assertEqual( reader.get_all_deltas_in_order(), @@ -72,10 +78,10 @@ def test_get_all_deltas_in_order(self) -> None: def test_get_metadata(self) -> None: writer = TuningArtifactsWriter( - GymlibIntegtestManager.get_dbgym_workspace(), + self.workspace, GymlibIntegtestManager.get_default_metadata(), ) - reader = TuningArtifactsReader(writer.tuning_artifacts_dpath) + reader = TuningArtifactsReader(writer.tuning_artifacts_path) metadata = reader.get_metadata() expected_metadata = GymlibIntegtestManager.get_default_metadata() self.assertEqual(metadata, expected_metadata) diff --git a/env/tests/integtest_workload.py b/env/tests/integtest_workload.py index ee6fc068..494e0365 100644 --- a/env/tests/integtest_workload.py +++ b/env/tests/integtest_workload.py @@ -3,34 +3,24 @@ from benchmark.tpch.constants import DEFAULT_TPCH_SEED, NUM_TPCH_QUERIES from env.tests.gymlib_integtest_util import GymlibIntegtestManager from env.workload import Workload -from util.workspace import ( - fully_resolve_path, - get_default_workload_name_suffix, - get_default_workload_path, - get_workload_name, -) +from util.workspace import DBGymWorkspace class WorkloadTests(unittest.TestCase): + workspace: DBGymWorkspace + @staticmethod def setUpClass() -> None: GymlibIntegtestManager.set_up_workspace() - - def test_workload(self) -> None: - workload_dpath = fully_resolve_path( - get_default_workload_path( - GymlibIntegtestManager.get_dbgym_workspace().dbgym_workspace_path, - GymlibIntegtestManager.BENCHMARK, - get_workload_name( - GymlibIntegtestManager.SCALE_FACTOR, - get_default_workload_name_suffix(GymlibIntegtestManager.BENCHMARK), - ), - ), + # Reset _num_times_created_this_run since previous tests may have created a workspace. + DBGymWorkspace._num_times_created_this_run = 0 + WorkloadTests.workspace = DBGymWorkspace( + GymlibIntegtestManager.get_workspace_path() ) - workload = Workload( - GymlibIntegtestManager.get_dbgym_workspace(), workload_dpath - ) + def test_workload(self) -> None: + workload_path = GymlibIntegtestManager.get_default_metadata().workload_path + workload = Workload(WorkloadTests.workspace, workload_path) # Check the order of query IDs. self.assertEqual( diff --git a/env/tuning_artifacts.py b/env/tuning_artifacts.py index a60d6bbe..ab26e3fd 100644 --- a/env/tuning_artifacts.py +++ b/env/tuning_artifacts.py @@ -1,7 +1,7 @@ import json from dataclasses import asdict, dataclass from pathlib import Path -from typing import Any, NewType, TypedDict +from typing import Any, NewType from util.workspace import DBGymWorkspace, is_fully_resolved @@ -68,12 +68,12 @@ class DBMSConfigDelta: qknobs: QueryKnobsDelta -def get_delta_at_step_fpath(tuning_artifacts_dpath: Path, step_num: int) -> Path: - return tuning_artifacts_dpath / f"step{step_num}_delta.json" +def get_delta_at_step_path(tuning_artifacts_path: Path, step_num: int) -> Path: + return tuning_artifacts_path / f"step{step_num}_delta.json" -def get_metadata_fpath(tuning_artifacts_dpath: Path) -> Path: - return tuning_artifacts_dpath / "metadata.json" +def get_metadata_path(tuning_artifacts_path: Path) -> Path: + return tuning_artifacts_path / "metadata.json" class TuningArtifactsWriter: @@ -81,14 +81,16 @@ def __init__( self, dbgym_workspace: DBGymWorkspace, metadata: TuningMetadata ) -> None: self.dbgym_workspace = dbgym_workspace - self.tuning_artifacts_dpath = self.dbgym_workspace.cur_task_runs_artifacts_path( - "tuning_artifacts", mkdir=True + self.tuning_artifacts_path = ( + self.dbgym_workspace.dbgym_this_run_path / "tuning_artifacts" ) - assert is_fully_resolved(self.tuning_artifacts_dpath) + # exist_ok is False because you should only create one TuningArtifactsWriter per run. + self.tuning_artifacts_path.mkdir(parents=False, exist_ok=False) + assert is_fully_resolved(self.tuning_artifacts_path) self.next_step_num = 0 # Write metadata file - with get_metadata_fpath(self.tuning_artifacts_dpath).open("w") as f: + with get_metadata_path(self.tuning_artifacts_path).open("w") as f: json.dump(metadata.asdict(), f) def write_step(self, dbms_cfg_delta: DBMSConfigDelta) -> None: @@ -97,23 +99,23 @@ def write_step(self, dbms_cfg_delta: DBMSConfigDelta) -> None: """ curr_step_num = self.next_step_num self.next_step_num += 1 - with get_delta_at_step_fpath(self.tuning_artifacts_dpath, curr_step_num).open( + with get_delta_at_step_path(self.tuning_artifacts_path, curr_step_num).open( "w" ) as f: json.dump(asdict(dbms_cfg_delta), f) class TuningArtifactsReader: - def __init__(self, tuning_artifacts_dpath: Path) -> None: - self.tuning_artifacts_dpath = tuning_artifacts_dpath - assert is_fully_resolved(self.tuning_artifacts_dpath) + def __init__(self, tuning_artifacts_path: Path) -> None: + self.tuning_artifacts_path = tuning_artifacts_path + assert is_fully_resolved(self.tuning_artifacts_path) num_steps = 0 - while get_delta_at_step_fpath(self.tuning_artifacts_dpath, num_steps).exists(): + while get_delta_at_step_path(self.tuning_artifacts_path, num_steps).exists(): num_steps += 1 self.num_steps = num_steps def get_metadata(self) -> TuningMetadata: - with get_metadata_fpath(self.tuning_artifacts_dpath).open("r") as f: + with get_metadata_path(self.tuning_artifacts_path).open("r") as f: data = json.load(f) return TuningMetadata( workload_path=Path(data["workload_path"]), @@ -126,7 +128,7 @@ def get_metadata(self) -> TuningMetadata: def get_delta_at_step(self, step_num: int) -> DBMSConfigDelta: assert step_num >= 0 and step_num < self.num_steps - with get_delta_at_step_fpath(self.tuning_artifacts_dpath, step_num).open( + with get_delta_at_step_path(self.tuning_artifacts_path, step_num).open( "r" ) as f: data = json.load(f) diff --git a/env/workload.py b/env/workload.py index 89dde426..4b164ffc 100644 --- a/env/workload.py +++ b/env/workload.py @@ -1,27 +1,27 @@ from pathlib import Path -from util.workspace import DBGymWorkspace, is_fully_resolved, open_and_save +from util.workspace import DBGymWorkspace, is_fully_resolved class Workload: - def __init__(self, dbgym_workspace: DBGymWorkspace, workload_dpath: Path) -> None: + def __init__(self, dbgym_workspace: DBGymWorkspace, workload_path: Path) -> None: self.dbgym_workspace = dbgym_workspace - self.workload_dpath = workload_dpath - assert is_fully_resolved(self.workload_dpath) + self.workload_path = workload_path + assert is_fully_resolved(self.workload_path) self.queries: dict[str, str] = {} - order_fpath = self.workload_dpath / "order.txt" + order_path = self.workload_path / "order.txt" self.query_order: list[str] = [] - assert order_fpath.exists() + assert order_path.exists() - with open_and_save(self.dbgym_workspace, order_fpath) as f: + with self.dbgym_workspace.open_and_save(order_path) as f: for line in f: qid, qpath = line.strip().split(",") qpath = Path(qpath) assert is_fully_resolved(qpath) - with open_and_save(self.dbgym_workspace, qpath) as qf: + with self.dbgym_workspace.open_and_save(qpath) as qf: self.queries[qid] = qf.read() self.query_order.append(qid) diff --git a/experiments/load_per_machine_envvars.sh b/experiments/load_per_machine_envvars.sh index 22b220c8..1a992195 100644 --- a/experiments/load_per_machine_envvars.sh +++ b/experiments/load_per_machine_envvars.sh @@ -2,13 +2,13 @@ host=$(hostname) if [ "$host" == "dev4" ]; then - export DBDATA_PARENT_DPATH=/mnt/nvme1n1/phw2/dbgym_tmp/ + export DBDATA_PARENT_PATH=/mnt/nvme1n1/phw2/dbgym_tmp/ export INTENDED_DBDATA_HARDWARE=ssd elif [ "$host" == "dev6" ]; then - export DBDATA_PARENT_DPATH=/mnt/nvme0n1/phw2/dbgym_tmp/ + export DBDATA_PARENT_PATH=/mnt/nvme0n1/phw2/dbgym_tmp/ export INTENDED_DBDATA_HARDWARE=ssd elif [ "$host" == "patnuc" ]; then - export DBDATA_PARENT_DPATH=../dbgym_workspace/tmp/ + export DBDATA_PARENT_PATH=../dbgym_workspace/tmp/ export INTENDED_DBDATA_HARDWARE=hdd else echo "Did not recognize host \"$host\"" diff --git a/gymlib_package/gymlib/__init__.py b/gymlib_package/gymlib/__init__.py index 30e9d5c3..99257dda 100644 --- a/gymlib_package/gymlib/__init__.py +++ b/gymlib_package/gymlib/__init__.py @@ -1 +1 @@ -from . import magic +from . import magic, symlinks_paths diff --git a/gymlib_package/gymlib/symlinks_paths.py b/gymlib_package/gymlib/symlinks_paths.py new file mode 100644 index 00000000..90736adf --- /dev/null +++ b/gymlib_package/gymlib/symlinks_paths.py @@ -0,0 +1,99 @@ +from pathlib import Path +from typing import Any + +# TODO: move these into workspace.py and move workspace.py into gymlib. +SYMLINKS_DNAME = "symlinks" +DBGYM_APP_NAME = "dbgym" + +SCALE_FACTOR_PLACEHOLDER: str = "[scale_factor]" +BENCHMARK_NAME_PLACEHOLDER: str = "[benchmark_name]" +WORKLOAD_NAME_PLACEHOLDER: str = "[workload_name]" + + +def get_scale_factor_string(scale_factor: float | str) -> str: + if type(scale_factor) is str and scale_factor == SCALE_FACTOR_PLACEHOLDER: + return scale_factor + else: + if float(int(scale_factor)) == scale_factor: + return str(int(scale_factor)) + else: + return str(scale_factor).replace(".", "point") + + +def get_tables_dirname(benchmark: str, scale_factor: float | str) -> str: + return f"tables_{benchmark}_sf{get_scale_factor_string(scale_factor)}" + + +def get_workload_suffix(benchmark: str, **kwargs: Any) -> str: + if benchmark == "tpch": + assert kwargs.keys() == {"seed_start", "seed_end", "query_subset"} + return f"{kwargs['seed_start']}_{kwargs['seed_end']}_{kwargs['query_subset']}" + elif benchmark == "job": + assert kwargs.keys() == {"query_subset"} + return f"{kwargs['query_subset']}" + else: + assert False + + +def get_workload_dirname(benchmark: str, scale_factor: float | str, suffix: str) -> str: + return f"workload_{benchmark}_sf{get_scale_factor_string(scale_factor)}_{suffix}" + + +def get_dbdata_tgz_filename(benchmark_name: str, scale_factor: float | str) -> str: + return f"{benchmark_name}_sf{get_scale_factor_string(scale_factor)}_pristine_dbdata.tgz" + + +def get_tables_symlink_path( + workspace_path: Path, benchmark: str, scale_factor: float | str +) -> Path: + return ( + workspace_path + / SYMLINKS_DNAME + / DBGYM_APP_NAME + / name_to_linkname(get_tables_dirname(benchmark, scale_factor)) + ) + + +def get_workload_symlink_path( + workspace_path: Path, benchmark: str, scale_factor: float | str, suffix: str +) -> Path: + return ( + workspace_path + / SYMLINKS_DNAME + / DBGYM_APP_NAME + / name_to_linkname(get_workload_dirname(benchmark, scale_factor, suffix)) + ) + + +def get_repo_symlink_path(workspace_path: Path) -> Path: + return workspace_path / SYMLINKS_DNAME / DBGYM_APP_NAME / "repo.link" + + +def get_pgbin_symlink_path(workspace_path: Path) -> Path: + return get_repo_symlink_path(workspace_path) / "boot" / "build" / "postgres" / "bin" + + +def get_dbdata_tgz_symlink_path( + workspace_path: Path, benchmark_name: str, scale_factor: float | str +) -> Path: + return ( + workspace_path + / SYMLINKS_DNAME + / DBGYM_APP_NAME + / name_to_linkname(get_dbdata_tgz_filename(benchmark_name, scale_factor)) + ) + + +def is_linkname(name: str) -> bool: + assert not name.endswith(".link.link") + return name.endswith(".link") + + +def name_to_linkname(name: str) -> str: + assert not is_linkname(name) + return f"{name}.link" + + +def linkname_to_name(linkname: str) -> str: + assert is_linkname(linkname) + return linkname[: -len(".link")] diff --git a/lab.py b/lab.py deleted file mode 100644 index 77595ff3..00000000 --- a/lab.py +++ /dev/null @@ -1,3 +0,0 @@ -import gymlib - -print(gymlib.magic.get_magic_number()) diff --git a/manage/cli.py b/manage/cli.py index 5d785d9f..4f41cbc7 100644 --- a/manage/cli.py +++ b/manage/cli.py @@ -12,7 +12,7 @@ get_runs_path_from_workspace_path, get_symlinks_path_from_workspace_path, is_child_path, - parent_dpath_of_path, + parent_path_of_path, ) @@ -52,13 +52,13 @@ def manage_count(dbgym_workspace: DBGymWorkspace) -> None: ) -def add_symlinks_in_dpath( - symlinks_stack: list[Path], root_dpath: Path, processed_symlinks: set[Path] +def add_symlinks_in_path( + symlinks_stack: list[Path], root_path: Path, processed_symlinks: set[Path] ) -> None: """ Will modify symlinks_stack and processed_symlinks. """ - for root_pathstr, dir_names, file_names in os.walk(root_dpath): + for root_pathstr, dir_names, file_names in os.walk(root_path): root_path = Path(root_pathstr) # symlinks can either be files or directories, so we go through both dir_names and file_names for file_name in chain(dir_names, file_names): @@ -101,14 +101,14 @@ def clean_workspace( any symlinks referenced in task_runs/run_*/ directories we have already decided to keep. """ # This stack holds the symlinks that are left to be processed - symlink_fpaths_to_process: list[Path] = [] + symlink_paths_to_process: list[Path] = [] # This set holds the symlinks that have already been processed to avoid infinite loops processed_symlinks: set[Path] = set() # 1. Initialize paths to process if dbgym_workspace.dbgym_symlinks_path.exists(): - add_symlinks_in_dpath( - symlink_fpaths_to_process, + add_symlinks_in_path( + symlink_paths_to_process, dbgym_workspace.dbgym_symlinks_path, processed_symlinks, ) @@ -116,64 +116,60 @@ def clean_workspace( # 2. Go through symlinks, figuring out which "children of task runs" to keep # Based on the rules of the framework, "children of task runs" should be run_*/ directories. # However, the user's workspace might happen to break these rules by putting directories not - # named "run_*/" or files directly in task_runs/. Thus, I use the term "task_run_child_fordpaths" - # instead of "run_dpaths". - task_run_child_fordpaths_to_keep = set() + # named "run_*/" or files directly in task_runs/. Thus, I use the term "task_run_child_paths" + # instead of "run_paths". + task_run_child_paths_to_keep = set() if dbgym_workspace.dbgym_runs_path.exists(): - while symlink_fpaths_to_process: - symlink_fpath: Path = symlink_fpaths_to_process.pop() - assert symlink_fpath.is_symlink() + while symlink_paths_to_process: + symlink_path: Path = symlink_paths_to_process.pop() + assert symlink_path.is_symlink() # Path.resolve() resolves all layers of symlinks while os.readlink() only resolves one layer. # However, os.readlink() literally reads the string contents of the link. We need to do some # processing on the result of os.readlink() to convert it to an absolute path - real_fordpath = symlink_fpath.resolve() - one_layer_resolved_fordpath = os.readlink(symlink_fpath) - assert str(real_fordpath) == str( - os.readlink(symlink_fpath) - ), f"symlink_fpath ({symlink_fpath}) seems to point to *another* symlink. This is difficult to handle, so it is currently disallowed. Please resolve this situation manually." + real_path = symlink_path.resolve() + one_layer_resolved_path = os.readlink(symlink_path) + assert str(real_path) == str( + os.readlink(symlink_path) + ), f"symlink_path ({symlink_path}) seems to point to *another* symlink. This is difficult to handle, so it is currently disallowed. Please resolve this situation manually." # If the file doesn't exist, we'll just ignore it. - if not real_fordpath.exists(): + if not real_path.exists(): continue # We're only trying to figure out which direct children of task_runs/ to save. If the file isn't # even a descendant, we don't care about it. - if not is_child_path(real_fordpath, dbgym_workspace.dbgym_runs_path): + if not is_child_path(real_path, dbgym_workspace.dbgym_runs_path): continue - assert not real_fordpath.samefile(dbgym_workspace.dbgym_runs_path) + assert not real_path.samefile(dbgym_workspace.dbgym_runs_path) - # Figure out the task_run_child_fordpath to put into task_run_child_fordpaths_to_keep - task_run_child_fordpath = None - if parent_dpath_of_path(real_fordpath).samefile( - dbgym_workspace.dbgym_runs_path - ): + # Figure out the task_run_child_path to put into task_run_child_paths_to_keep + task_run_child_path = None + if parent_path_of_path(real_path).samefile(dbgym_workspace.dbgym_runs_path): # While it's true that it shouldn't be possible to symlink to a directory directly in task_runs/, # we'll just not delete it if the user happens to have one like this. Even if the user messed up # the structure somehow, it's just a good idea not to delete it. - task_run_child_fordpath = real_fordpath + task_run_child_path = real_path else: # Technically, it's not allowed to symlink to any files not in task_runs/run_*/[codebase]/[organization]/. # However, as with above, we won't just nuke files if the workspace doesn't follow this rule for # some reason. - task_run_child_fordpath = real_fordpath - while not parent_dpath_of_path(task_run_child_fordpath).samefile( + task_run_child_path = real_path + while not parent_path_of_path(task_run_child_path).samefile( dbgym_workspace.dbgym_runs_path ): - task_run_child_fordpath = parent_dpath_of_path( - task_run_child_fordpath - ) - assert task_run_child_fordpath != None - assert parent_dpath_of_path(task_run_child_fordpath).samefile( + task_run_child_path = parent_path_of_path(task_run_child_path) + assert task_run_child_path != None + assert parent_path_of_path(task_run_child_path).samefile( dbgym_workspace.dbgym_runs_path - ), f"task_run_child_fordpath ({task_run_child_fordpath}) is not a direct child of dbgym_workspace.dbgym_runs_path" - task_run_child_fordpaths_to_keep.add(task_run_child_fordpath) + ), f"task_run_child_path ({task_run_child_path}) is not a direct child of dbgym_workspace.dbgym_runs_path" + task_run_child_paths_to_keep.add(task_run_child_path) - # If on safe mode, add symlinks inside the task_run_child_fordpath to be processed + # If on safe mode, add symlinks inside the task_run_child_path to be processed if mode == "safe": - add_symlinks_in_dpath( - symlink_fpaths_to_process, - task_run_child_fordpath, + add_symlinks_in_path( + symlink_paths_to_process, + task_run_child_path, processed_symlinks, ) @@ -181,12 +177,12 @@ def clean_workspace( # It's true that symlinks might link outside of task_runs/*. We'll just not care about those starting_num_files = _count_files_in_workspace(dbgym_workspace) if dbgym_workspace.dbgym_runs_path.exists(): - for child_fordpath in dbgym_workspace.dbgym_runs_path.iterdir(): - if child_fordpath not in task_run_child_fordpaths_to_keep: - if child_fordpath.is_dir(): - shutil.rmtree(child_fordpath) + for child_path in dbgym_workspace.dbgym_runs_path.iterdir(): + if child_path not in task_run_child_paths_to_keep: + if child_path.is_dir(): + shutil.rmtree(child_path) else: - os.remove(child_fordpath) + os.remove(child_path) ending_num_files = _count_files_in_workspace(dbgym_workspace) if verbose: diff --git a/scripts/pat_test.sh b/scripts/pat_test.sh index dcbb9a90..c15a20c3 100755 --- a/scripts/pat_test.sh +++ b/scripts/pat_test.sh @@ -13,4 +13,4 @@ python3 task.py benchmark job workload --query-subset demo # postgres python3 task.py dbms postgres build -python3 task.py dbms postgres dbdata job --intended-dbdata-hardware $INTENDED_DBDATA_HARDWARE --dbdata-parent-dpath $DBDATA_PARENT_DPATH \ No newline at end of file +python3 task.py dbms postgres dbdata job --intended-dbdata-hardware $INTENDED_DBDATA_HARDWARE --dbdata-parent-path $DBDATA_PARENT_PATH \ No newline at end of file diff --git a/task.py b/task.py index 1205999b..f5a0e278 100644 --- a/task.py +++ b/task.py @@ -18,9 +18,9 @@ def task(ctx: click.Context) -> None: dbgym_workspace = make_standard_dbgym_workspace() ctx.obj = dbgym_workspace - log_dpath = dbgym_workspace.cur_task_runs_artifacts_path(mkdir=True) - set_up_loggers(log_dpath) - set_up_warnings(log_dpath) + log_path = dbgym_workspace.dbgym_this_run_path + set_up_loggers(log_path) + set_up_warnings(log_path) if __name__ == "__main__": diff --git a/util/log.py b/util/log.py index 271fd92f..ae0ca936 100644 --- a/util/log.py +++ b/util/log.py @@ -8,7 +8,7 @@ DBGYM_OUTPUT_LOGGER_NAME = f"{DBGYM_LOGGER_NAME}.output" -def set_up_loggers(log_dpath: Path) -> None: +def set_up_loggers(log_path: Path) -> None: """ Set up everything related to the logging library. @@ -21,7 +21,7 @@ def set_up_loggers(log_dpath: Path) -> None: _set_up_logger( logging.getLogger(DBGYM_LOGGER_NAME), log_format, - log_dpath / f"{DBGYM_LOGGER_NAME}.log", + log_path / f"{DBGYM_LOGGER_NAME}.log", ) # The output logger is meant to output things to the console. We use it instead of using print to indicate that something is @@ -40,7 +40,7 @@ def set_up_loggers(log_dpath: Path) -> None: def _set_up_logger( logger: Logger, format: str, - output_log_fpath: Optional[Path], + output_log_path: Optional[Path], console_level: int = logging.ERROR, file_level: int = logging.DEBUG, ) -> None: @@ -55,18 +55,18 @@ def _set_up_logger( logger.addHandler(console_handler) # Let it output everything to the output file. - if output_log_fpath is not None: - file_handler = logging.FileHandler(output_log_fpath) + if output_log_path is not None: + file_handler = logging.FileHandler(output_log_path) file_handler.setFormatter(formatter) file_handler.setLevel(file_level) logger.addHandler(file_handler) -def set_up_warnings(log_dpath: Path) -> None: +def set_up_warnings(log_path: Path) -> None: """ Some libraries (like torch) use warnings instead of logging for warnings. I want to redirect these too to avoid cluttering the console. """ - warnings_fpath = log_dpath / "warnings.log" + warnings_path = log_path / "warnings.log" def write_warning_to_file( message: Any, @@ -76,7 +76,7 @@ def write_warning_to_file( file: Optional[Any] = None, line: Optional[Any] = None, ) -> None: - with open(warnings_fpath, "a") as f: + with open(warnings_path, "a") as f: f.write(f"{filename}:{lineno}: {category.__name__}: {message}\n") warnings.showwarning = write_warning_to_file diff --git a/util/pg.py b/util/pg.py index 32f2fdee..23c06b60 100644 --- a/util/pg.py +++ b/util/pg.py @@ -11,7 +11,7 @@ import sqlalchemy from sqlalchemy import create_engine, text -from util.workspace import DBGymWorkspace, open_and_save +from util.workspace import DBGymWorkspace DBGYM_POSTGRES_USER = "dbgym_user" DBGYM_POSTGRES_PASS = "dbgym_pass" @@ -28,7 +28,7 @@ def sqlalchemy_conn_execute( def sql_file_queries(dbgym_workspace: DBGymWorkspace, filepath: Path) -> list[str]: - with open_and_save(dbgym_workspace, filepath) as f: + with dbgym_workspace.open_and_save(filepath) as f: lines: list[str] = [] for line in f: if line.startswith("--"): diff --git a/util/tests/unittest_workspace.py b/util/tests/unittest_workspace.py index 89a1c6c1..0575b8b3 100644 --- a/util/tests/unittest_workspace.py +++ b/util/tests/unittest_workspace.py @@ -40,8 +40,8 @@ def tearDown(self) -> None: # Importantly though, I don't have helper functions for the complex functions that I want to test (e.g. link_result and save_file). def init_workspace_helper(self) -> None: # Reset this to avoid the error of it being created twice. - # In real usage, the second run would be a different Python process so DBGymWorkspace.num_times_created_this_run would be 0. - DBGymWorkspace.num_times_created_this_run = 0 + # In real usage, the second run would be a different Python process so DBGymWorkspace._num_times_created_this_run would be 0. + DBGymWorkspace._num_times_created_this_run = 0 self.workspace = DBGymWorkspace(self.workspace_path) if self.expected_structure is None: @@ -273,7 +273,7 @@ def test_link_result_cannot_link_symlink(self) -> None: ) with self.assertRaisesRegex( AssertionError, - "result_fordpath \(.*\) should be a fully resolved path", + "result_path \(.*\) should be a fully resolved path", ): self.workspace.link_result(symlink_path) @@ -452,7 +452,7 @@ def test_save_file_generated_this_run_raises_error(self) -> None: result_path = self.make_result_helper() with self.assertRaisesRegex( AssertionError, - "fpath \(.*\) was generated in this task run \(.*\)\. You do not need to save it", + "path \(.*\) was generated in this task run \(.*\)\. You do not need to save it", ): self.workspace.save_file(result_path) diff --git a/util/workspace.py b/util/workspace.py index 11e0d620..32bfd03d 100644 --- a/util/workspace.py +++ b/util/workspace.py @@ -11,20 +11,11 @@ from pathlib import Path from typing import IO, Any, Optional -import redis import yaml +from gymlib.symlinks_paths import is_linkname, name_to_linkname -from benchmark.tpch.constants import DEFAULT_TPCH_SEED from util.log import DBGYM_LOGGER_NAME -from util.shell import subprocess_run -# Relative paths of different folders in the codebase -DBMS_PATH = Path("dbms") -POSTGRES_PATH = DBMS_PATH / "postgres" -TUNE_PATH = Path("tune") - -# Paths of different parts of the workspace -# I made these Path objects even though they're not real paths just so they can work correctly with my other helper functions WORKSPACE_PATH_PLACEHOLDER = Path("[workspace]") @@ -45,111 +36,9 @@ def get_latest_run_path_from_workspace_path(workspace_path: Path) -> Path: return get_runs_path_from_workspace_path(workspace_path) / "latest_run.link" -def get_scale_factor_string(scale_factor: float | str) -> str: - if type(scale_factor) is str and scale_factor == SCALE_FACTOR_PLACEHOLDER: - return scale_factor - else: - if float(int(scale_factor)) == scale_factor: - return str(int(scale_factor)) - else: - return str(scale_factor).replace(".", "point") - - -def get_dbdata_tgz_name(benchmark_name: str, scale_factor: float | str) -> str: - return f"{benchmark_name}_sf{get_scale_factor_string(scale_factor)}_pristine_dbdata.tgz" - - -# Other parameters -BENCHMARK_NAME_PLACEHOLDER: str = "[benchmark_name]" -WORKLOAD_NAME_PLACEHOLDER: str = "[workload_name]" -SCALE_FACTOR_PLACEHOLDER: str = "[scale_factor]" - # Paths of config files in the codebase. These are always relative paths. # The reason these can be relative paths instead of functions taking in codebase_path as input is because relative paths are relative to the codebase root -DEFAULT_BOOT_CONFIG_FPATH = POSTGRES_PATH / "default_boot_config.yaml" - - -# Generally useful functions -def get_workload_name(scale_factor: float | str, suffix: str) -> str: - return f"workload_sf{get_scale_factor_string(scale_factor)}_{suffix}" - - -def get_default_workload_name_suffix(benchmark_name: str) -> str: - if benchmark_name == "tpch": - return f"{DEFAULT_TPCH_SEED}_{DEFAULT_TPCH_SEED}_all" - if benchmark_name == "job": - return "all" - else: - assert False - - -# Standard names of files/directories. These can refer to either the actual file/directory or a link to the file/directory. -# Since they can refer to either the actual or the link, they do not have ".link" in them. -def get_default_tables_dname(scale_factor: float | str) -> str: - return f"tables_sf{get_scale_factor_string(scale_factor)}" - - -# Paths of dependencies in the workspace. These are named "*_path" because they will be an absolute path -# The reason these _cannot_ be relative paths is because relative paths are relative to the codebase root, not the workspace root -# Note that it's okay to hardcode the codebase paths (like dbgym_dbms_postgres) here. In the worst case, we'll just break an -# integration test. The "source of truth" of codebase paths is based on DBGymWorkspace.cur_source_path(), which will always -# reflect the actual codebase structure. As long as we automatically enforce getting the right codebase paths when writing, it's -# ok to have to hardcode them when reading. -# Details -# - If a name already has the workload_name, I omit scale factor. This is because the workload_name includes the scale factor -# - By convention, symlinks should end with ".link". The bug that motivated this decision involved replaying a tuning run. When -# replaying a tuning run, you read the tuning_steps/ folder of the tuning run. Earlier, I created a symlink to that tuning_steps/ -# folder called run_*/*/tuning_steps. However, replay itself generates an replay_info.log file, which goes in -# run_*/*/tuning_steps/. The bug was that my replay function was overwriting the replay_info.log file of the -# tuning run. By naming all symlinks "*.link", we avoid the possibility of subtle bugs like this happening. -def get_default_tables_path( - workspace_path: Path, benchmark_name: str, scale_factor: float | str -) -> Path: - return ( - get_symlinks_path_from_workspace_path(workspace_path) - / f"dbgym_benchmark_{benchmark_name}" - / "data" - / (get_default_tables_dname(scale_factor) + ".link") - ) - - -def get_default_workload_path( - workspace_path: Path, benchmark_name: str, workload_name: str -) -> Path: - return ( - get_symlinks_path_from_workspace_path(workspace_path) - / f"dbgym_benchmark_{benchmark_name}" - / "data" - / (workload_name + ".link") - ) - - -def get_default_pristine_dbdata_snapshot_path( - workspace_path: Path, benchmark_name: str, scale_factor: float | str -) -> Path: - return ( - get_symlinks_path_from_workspace_path(workspace_path) - / "dbgym_dbms_postgres" - / "data" - / (get_dbdata_tgz_name(benchmark_name, scale_factor) + ".link") - ) - - -def get_default_dbdata_parent_dpath(workspace_path: Path) -> Path: - return get_tmp_path_from_workspace_path(workspace_path) - - -def get_default_repo_path(workspace_path: Path) -> Path: - return ( - get_symlinks_path_from_workspace_path(workspace_path) - / "dbgym_dbms_postgres" - / "build" - / "repo.link" - ) - - -def get_default_pgbin_path(workspace_path: Path) -> Path: - return get_default_repo_path(workspace_path) / "boot" / "build" / "postgres" / "bin" +DEFAULT_BOOT_CONFIG_PATH = Path("dbms") / "postgres" / "default_boot_config.yaml" class DBGymWorkspace: @@ -157,17 +46,17 @@ class DBGymWorkspace: Global configurations that apply to all parts of DB-Gym """ - num_times_created_this_run: int = 0 + _num_times_created_this_run: int = 0 def __init__(self, dbgym_workspace_path: Path): # The logic around dbgym_tmp_path assumes that DBGymWorkspace is only constructed once. - DBGymWorkspace.num_times_created_this_run += 1 + # This is because DBGymWorkspace creates a new run_*/ dir when it's initialized. + DBGymWorkspace._num_times_created_this_run += 1 assert ( - DBGymWorkspace.num_times_created_this_run == 1 - ), f"DBGymWorkspace has been created {DBGymWorkspace.num_times_created_this_run} times. It should only be created once per run." + DBGymWorkspace._num_times_created_this_run == 1 + ), f"DBGymWorkspace has been created {DBGymWorkspace._num_times_created_this_run} times. It should only be created once per run." - self.base_dbgym_repo_dpath = get_base_dbgym_repo_dpath() - self.cur_path_list: list[str] = ["dbgym"] + self.base_dbgym_repo_path = get_base_dbgym_repo_path() self.app_name = ( "dbgym" # TODO: discover this dynamically. app means dbgym or an agent ) @@ -187,6 +76,7 @@ def __init__(self, dbgym_workspace_path: Path): self.dbgym_workspace_path ) self.dbgym_symlinks_path.mkdir(parents=True, exist_ok=True) + self.dbgym_cur_symlinks_path = self.dbgym_symlinks_path / self.app_name # tmp/ is a workspace for this run only # One use for it is to place the unzipped dbdata. # There's no need to save the actual dbdata dir in run_*/ because we just save a symlink to @@ -210,6 +100,8 @@ def __init__(self, dbgym_workspace_path: Path): ) # `exist_ok` is False because we don't want to override a previous task run's data. self.dbgym_this_run_path.mkdir(parents=True, exist_ok=False) + # Break if it succeeds so we don't do it a second time. + break except FileExistsError: # In case we call task.py twice in one second, sleeping here will fix it. # Waiting one second is enough since we assume there's only one task.py running at a time. @@ -226,63 +118,63 @@ def __init__(self, dbgym_workspace_path: Path): # TODO(phw2): refactor our manual symlinking in postgres/cli.py to use link_result() instead def link_result( self, - result_fordpath: Path, + result_path: Path, custom_link_name: Optional[str] = None, ) -> Path: """ - result_fordpath must be a "result", meaning it was generated inside dbgym_workspace.dbgym_this_run_path. - Further, result_fordpath must have been generated by this invocation to task.py. This also means that - result_fordpath itself can be a file or a dir but not a symlink. + result_path must be a "result", meaning it was generated inside dbgym_workspace.dbgym_this_run_path. + Further, result_path must have been generated by this invocation to task.py. This also means that + result_path itself can be a file or a dir but not a symlink. Given a file or directory in task_runs/run_*/[codebase]/[org], this will create a symlink inside symlinks/[codebase]/[org]/. Will override the old symlink if there is one, so that symlinks/ always contains the latest generated version of a file. This function will return the path to the symlink that was created. """ - assert isinstance(result_fordpath, Path) + assert isinstance(result_path, Path) assert is_fully_resolved( - result_fordpath - ), f"result_fordpath ({result_fordpath}) should be a fully resolved path" + result_path + ), f"result_path ({result_path}) should be a fully resolved path" assert is_child_path( - result_fordpath, self.dbgym_this_run_path + result_path, self.dbgym_this_run_path ), "The result must have been generated in *this* run_*/ dir" - assert not os.path.islink(result_fordpath) + assert not os.path.islink(result_path) if type(custom_link_name) is str: link_name = custom_link_name else: - if os.path.isfile(result_fordpath): - link_name = basename_of_path(result_fordpath) + ".link" - elif os.path.isdir(result_fordpath): - link_name = basename_of_path(result_fordpath) + ".link" + if os.path.isfile(result_path): + link_name = name_to_linkname(basename_of_path(result_path)) + elif os.path.isdir(result_path): + link_name = name_to_linkname(basename_of_path(result_path)) else: - raise AssertionError("result_fordpath must be either a file or dir") + raise AssertionError("result_path must be either a file or dir") - symlink_parent_dpath = self.dbgym_symlinks_path / self.app_name - symlink_parent_dpath.mkdir(parents=True, exist_ok=True) + symlink_parent_path = self.dbgym_symlinks_path / self.app_name + symlink_parent_path.mkdir(parents=True, exist_ok=True) # Remove the old symlink ("old" meaning created in an earlier run) if there is one # Note that in a multi-threaded setting, this might remove one created by a process in the same run, # meaning it's not "old" by our definition of "old". However, we'll always end up with a symlink # file of the current run regardless of the order of threads. - assert link_name.endswith(".link") and not link_name.endswith( - ".link.link" + assert is_linkname( + link_name ), f'link_name ({link_name}) should end with ".link"' - symlink_path = symlink_parent_dpath / link_name + symlink_path = symlink_parent_path / link_name try_remove_file(symlink_path) - try_create_symlink(result_fordpath, symlink_path) + try_create_symlink(result_path, symlink_path) return symlink_path - def get_run_dpath_from_fpath(self, fpath: Path) -> Path: - run_dpath = fpath - while not parent_dpath_of_path(run_dpath).samefile(self.dbgym_runs_path): - run_dpath = parent_dpath_of_path(run_dpath) - return run_dpath + def get_run_path_from_path(self, path: Path) -> Path: + run_path = path + while not parent_path_of_path(run_path).samefile(self.dbgym_runs_path): + run_path = parent_path_of_path(run_path) + return run_path # TODO(phw2): really look at the clean PR to see what it changed # TODO(phw2): after merging agent-train, refactor some code in agent-train to use save_file() instead of open_and_save() - def save_file(self, fpath: Path) -> None: + def save_file(self, path: Path) -> None: """ If an external function takes in a file/directory as input, you will not be able to call open_and_save(). In these situations, just call save_file(). @@ -300,98 +192,83 @@ def save_file(self, fpath: Path) -> None: - If you save two dependencies with the same *outermost* directory, or two dependencies with the same filename both directly inside run_*/, the second save will overwrite the first. """ - # validate fpath - assert isinstance(fpath, Path) - assert not os.path.islink(fpath), f"fpath ({fpath}) should not be a symlink" - assert os.path.exists(fpath), f"fpath ({fpath}) does not exist" - assert os.path.isfile(fpath), f"fpath ({fpath}) is not a file" + # validate path + assert isinstance(path, Path) + assert not os.path.islink(path), f"path ({path}) should not be a symlink" + assert os.path.exists(path), f"path ({path}) does not exist" + assert os.path.isfile(path), f"path ({path}) is not a file" assert not is_child_path( - fpath, self.dbgym_this_run_path - ), f"fpath ({fpath}) was generated in this task run ({self.dbgym_this_run_path}). You do not need to save it" + path, self.dbgym_this_run_path + ), f"path ({path}) was generated in this task run ({self.dbgym_this_run_path}). You do not need to save it" # Save _something_ to dbgym_this_run_path. # Save a symlink if the opened file was generated by a run. This is for two reasons: # 1. Files or dirs generated by a run are supposed to be immutable so saving a symlink is safe. # 2. Files or dirs generated by a run may be very large (up to 100s of GBs) so we don't want to copy them. - if is_child_path(fpath, self.dbgym_runs_path): - # If the fpath file is directly in run_dpath, we symlink the file directly. - run_dpath = self.get_run_dpath_from_fpath(fpath) - parent_dpath = parent_dpath_of_path(fpath) - if parent_dpath.samefile(run_dpath): - fname = basename_of_path(fpath) - symlink_fpath = self.dbgym_this_run_path / (fname + ".link") - try_remove_file(symlink_fpath) - try_create_symlink(fpath, symlink_fpath) - # Otherwise, we know the fpath file is _not_ directly inside run_dpath dir. - # We go as far back as we can while still staying in run_dpath and symlink that "base" dir. - # This is because lots of runs create dirs within run_dpath and it creates too much clutter to symlink every individual file. + if is_child_path(path, self.dbgym_runs_path): + # If the path file is directly in run_path, we symlink the file directly. + run_path = self.get_run_path_from_path(path) + parent_path = parent_path_of_path(path) + if parent_path.samefile(run_path): + fname = basename_of_path(path) + symlink_path = self.dbgym_this_run_path / name_to_linkname(fname) + try_remove_file(symlink_path) + try_create_symlink(path, symlink_path) + # Otherwise, we know the path file is _not_ directly inside run_path dir. + # We go as far back as we can while still staying in run_path and symlink that "base" dir. + # This is because lots of runs create dirs within run_path and it creates too much clutter to symlink every individual file. # Further, this avoids an edge case where you both save a file and the dir it's in. else: - # Set base_dpath such that its parent is run_dpath. - base_dpath = parent_dpath - while not parent_dpath_of_path(base_dpath).samefile(run_dpath): - base_dpath = parent_dpath_of_path(base_dpath) + # Set base_path such that its parent is run_path. + base_path = parent_path + while not parent_path_of_path(base_path).samefile(run_path): + base_path = parent_path_of_path(base_path) # Create symlink - open_base_dname = basename_of_path(base_dpath) - symlink_dpath = self.dbgym_this_run_path / (open_base_dname + ".link") - try_remove_file(symlink_dpath) - try_create_symlink(base_dpath, symlink_dpath) + open_base_dname = basename_of_path(base_path) + symlink_path = self.dbgym_this_run_path / name_to_linkname( + open_base_dname + ) + try_remove_file(symlink_path) + try_create_symlink(base_path, symlink_path) # If the file wasn't generated by a run, we can't just symlink it because we don't know that it's immutable. else: - fname = basename_of_path(fpath) + fname = basename_of_path(path) # In this case, we want to copy instead of symlinking since it might disappear in the future. - copy_fpath = self.dbgym_this_run_path / fname - shutil.copy(fpath, copy_fpath) - - # `append_group()` is used to mark the "codebase path" of an invocation of the CLI. The "codebase path" is explained further in the documentation. - def append_group(self, name: str) -> None: - self.cur_path_list.append(name) - - def cur_source_path(self, *dirs: str) -> Path: - cur_path = self.base_dbgym_repo_dpath - assert self.cur_path_list[0] == "dbgym" - for folder in self.cur_path_list[1:]: - cur_path = cur_path / folder - for dir in dirs: - cur_path = cur_path / dir - return cur_path - - def cur_symlinks_path(self, *dirs: str, mkdir: bool = False) -> Path: - flattened_structure = "_".join(self.cur_path_list) - cur_path = self.dbgym_symlinks_path / flattened_structure - for dir in dirs: - cur_path = cur_path / dir - if mkdir: - cur_path.mkdir(parents=True, exist_ok=True) - return cur_path - - def cur_task_runs_path(self, *dirs: str, mkdir: bool = False) -> Path: - flattened_structure = "_".join(self.cur_path_list) - cur_path = self.dbgym_this_run_path / flattened_structure - for dir in dirs: - cur_path = cur_path / dir - if mkdir: - cur_path.mkdir(parents=True, exist_ok=True) - return cur_path - - def cur_symlinks_bin_path(self, *dirs: str, mkdir: bool = False) -> Path: - return self.cur_symlinks_path("bin", *dirs, mkdir=mkdir) - - def cur_symlinks_build_path(self, *dirs: str, mkdir: bool = False) -> Path: - return self.cur_symlinks_path("build", *dirs, mkdir=mkdir) - - def cur_symlinks_data_path(self, *dirs: str, mkdir: bool = False) -> Path: - return self.cur_symlinks_path("data", *dirs, mkdir=mkdir) - - def cur_task_runs_build_path(self, *dirs: str, mkdir: bool = False) -> Path: - return self.cur_task_runs_path("build", *dirs, mkdir=mkdir) - - def cur_task_runs_data_path(self, *dirs: str, mkdir: bool = False) -> Path: - return self.cur_task_runs_path("data", *dirs, mkdir=mkdir) - - def cur_task_runs_artifacts_path(self, *dirs: str, mkdir: bool = False) -> Path: - return self.cur_task_runs_path("artifacts", *dirs, mkdir=mkdir) + copy_path = self.dbgym_this_run_path / fname + shutil.copy(path, copy_path) + + def open_and_save(self, open_path: Path, mode: str = "r") -> IO[Any]: + """ + Open a file and "save" it to [workspace]/task_runs/run_*/. + It takes in a str | Path to match the interface of open(). + This file does not work if open_path is a symlink, to make its interface identical to that of open(). + Make sure to resolve all symlinks with fully_resolve_path(). + To avoid confusion, I'm enforcing this function to only work with absolute paths. + # TODO: maybe make it work on non-fully-resolved paths to better match open() + See the comment of save_file() for what "saving" means + If you are generating a "result" for the run, _do not_ use this. Just use the normal open(). + This shouldn't be too hard to remember because this function crashes if open_path doesn't exist, + and when you write results you're usually opening open_paths which do not exist. + """ + # Validate open_path + assert isinstance(open_path, Path) + assert is_fully_resolved( + open_path + ), f"open_and_save(): open_path ({open_path}) should be a fully resolved path" + assert not os.path.islink( + open_path + ), f"open_path ({open_path}) should not be a symlink" + assert os.path.exists(open_path), f"open_path ({open_path}) does not exist" + # `open_and_save`` *must* be called on files because it doesn't make sense to open a directory. note that this doesn't mean we'll always save + # a file though. we sometimes save a directory (see save_file() for details) + assert os.path.isfile(open_path), f"open_path ({open_path}) is not a file" + + # Save + self.save_file(open_path) + + # Open + return open(open_path, mode=mode) def get_workspace_path_from_config(dbgym_config_path: Path) -> Path: @@ -433,10 +310,10 @@ def fully_resolve_path(inputpath: os.PathLike[str]) -> Path: realabspath = Path(inputpath) # `expanduser()` is always "ok" to call first. realabspath = realabspath.expanduser() - # The reason we don't call Path.absolute() is because the path should be relative to get_base_dbgym_repo_dpath(), + # The reason we don't call Path.absolute() is because the path should be relative to get_base_dbgym_repo_path(), # which is not necessary where cwd() points at the time of calling this function. if not realabspath.is_absolute(): - realabspath = get_base_dbgym_repo_dpath() / realabspath + realabspath = get_base_dbgym_repo_path() / realabspath # `resolve()` has two uses: normalize the path (remove ..) and resolve symlinks. # I believe the pathlib library (https://docs.python.org/3/library/pathlib.html#pathlib.Path.resolve) does these together this # way to avoid an edge case related to symlinks and normalizing paths (footnote 1 of the linked docs) @@ -447,15 +324,15 @@ def fully_resolve_path(inputpath: os.PathLike[str]) -> Path: return realabspath -def get_base_dbgym_repo_dpath() -> Path: +def get_base_dbgym_repo_path() -> Path: path = Path(os.getcwd()) - assert _is_base_dbgym_repo_dpath( + assert _is_base_dbgym_repo_path( path ), "This script should be invoked from the root of the dbgym repo." return path -def _is_base_dbgym_repo_dpath(path: Path) -> bool: +def _is_base_dbgym_repo_path(path: Path) -> bool: """ Returns whether we are in the base directory of some git repository """ @@ -505,238 +382,81 @@ def is_fully_resolved(path: Path) -> bool: return str(resolved_path) == str(path) -def parent_dpath_of_path(dpath: Path) -> Path: +def parent_path_of_path(path: Path) -> Path: """ This function only calls Path.parent, but in a safer way. """ - assert isinstance(dpath, Path) + assert isinstance(path, Path) assert is_fully_resolved( - dpath - ), f"dpath must be fully resolved because Path.parent has weird behavior on non-resolved paths (see https://docs.python.org/3/library/pathlib.html#pathlib.PurePath.parent)" - parent_dpath = dpath.parent - assert isinstance(parent_dpath, Path) - return parent_dpath + path + ), f"path must be fully resolved because Path.parent has weird behavior on non-resolved paths (see https://docs.python.org/3/library/pathlib.html#pathlib.PurePath.parent)" + parent_path = path.parent + assert isinstance(parent_path, Path) + return parent_path -def basename_of_path(dpath: Path) -> str: +def basename_of_path(path: Path) -> str: """ This function only calls Path.name, but in a safer way. """ - assert isinstance(dpath, Path) + assert isinstance(path, Path) assert is_fully_resolved( - dpath - ), f'dpath must be fully resolved because Path.name has weird behavior on non-resolved paths (like giving ".." if the path ends with a "..")' - dpath_dirname, dpath_basename = os.path.split(dpath) + path + ), f'path must be fully resolved because Path.name has weird behavior on non-resolved paths (like giving ".." if the path ends with a "..")' + path_dirname, path_basename = os.path.split(path) # this means the path ended with a '/' so all os.path.split() does is get rid of the slash - if dpath_basename == "": - return os.path.basename(dpath_dirname) + if path_basename == "": + return os.path.basename(path_dirname) else: - return dpath_basename + return path_basename # TODO(phw2): refactor to use Path -def is_child_path(child_path: os.PathLike[str], parent_dpath: os.PathLike[str]) -> bool: +def is_child_path(child_path: os.PathLike[str], parent_path: os.PathLike[str]) -> bool: """ - Checks whether child_path refers to a file/dir/link that is a child of the dir referred to by parent_dpath + Checks whether child_path refers to a file/dir/link that is a child of the dir referred to by parent_path If the two paths are equal, this function returns FALSE """ - assert os.path.isdir(parent_dpath) - if os.path.samefile(child_path, parent_dpath): + assert os.path.isdir(parent_path) + if os.path.samefile(child_path, parent_path): return False else: return os.path.samefile( - os.path.commonpath([parent_dpath, child_path]), parent_dpath + os.path.commonpath([parent_path, child_path]), parent_path ) -def open_and_save( - dbgym_workspace: DBGymWorkspace, open_fpath: Path, mode: str = "r" -) -> IO[Any]: - """ - Open a file and "save" it to [workspace]/task_runs/run_*/. - It takes in a str | Path to match the interface of open(). - This file does not work if open_fpath is a symlink, to make its interface identical to that of open(). - Make sure to resolve all symlinks with fully_resolve_path(). - To avoid confusion, I'm enforcing this function to only work with absolute paths. - # TODO: maybe make it work on non-fully-resolved paths to better match open() - See the comment of save_file() for what "saving" means - If you are generating a "result" for the run, _do not_ use this. Just use the normal open(). - This shouldn't be too hard to remember because this function crashes if open_fpath doesn't exist, - and when you write results you're usually opening open_fpaths which do not exist. - """ - # validate open_fpath - assert isinstance(open_fpath, Path) - assert is_fully_resolved( - open_fpath - ), f"open_and_save(): open_fpath ({open_fpath}) should be a fully resolved path" - assert not os.path.islink( - open_fpath - ), f"open_fpath ({open_fpath}) should not be a symlink" - assert os.path.exists(open_fpath), f"open_fpath ({open_fpath}) does not exist" - # open_and_save *must* be called on files because it doesn't make sense to open a directory. note that this doesn't mean we'll always save - # a file though. we sometimes save a directory (see save_file() for details) - assert os.path.isfile(open_fpath), f"open_fpath ({open_fpath}) is not a file" - - # save - save_file(dbgym_workspace, open_fpath) - - # open - return open(open_fpath, mode=mode) - - -def extract_from_task_run_fordpath( - dbgym_workspace: DBGymWorkspace, task_run_fordpath: Path +def extract_from_task_run_path( + dbgym_workspace: DBGymWorkspace, task_run_path: Path ) -> tuple[Path, str, Path, str]: """ The task_runs/ folder is organized like task_runs/run_*/[codebase]/[org]/any/path/you/want. This function extracts the [codebase] and [org] components """ - assert isinstance(task_run_fordpath, Path) - assert not task_run_fordpath.is_symlink() - parent_dpath = task_run_fordpath.parent + assert isinstance(task_run_path, Path) + assert not task_run_path.is_symlink() + parent_path = task_run_path.parent # TODO(phw2): make this a common function - assert not parent_dpath.samefile( + assert not parent_path.samefile( dbgym_workspace.dbgym_runs_path - ), f"task_run_fordpath ({task_run_fordpath}) should be inside a run_*/ dir instead of directly in dbgym_workspace.dbgym_runs_path ({dbgym_workspace.dbgym_runs_path})" - assert not parent_dpath_of_path(parent_dpath).samefile( + ), f"task_run_path ({task_run_path}) should be inside a run_*/ dir instead of directly in dbgym_workspace.dbgym_runs_path ({dbgym_workspace.dbgym_runs_path})" + assert not parent_path_of_path(parent_path).samefile( dbgym_workspace.dbgym_runs_path - ), f"task_run_fordpath ({task_run_fordpath}) should be inside a run_*/[codebase]/ dir instead of directly in run_*/ ({dbgym_workspace.dbgym_runs_path})" - assert not parent_dpath_of_path(parent_dpath_of_path(parent_dpath)).samefile( + ), f"task_run_path ({task_run_path}) should be inside a run_*/[codebase]/ dir instead of directly in run_*/ ({dbgym_workspace.dbgym_runs_path})" + assert not parent_path_of_path(parent_path_of_path(parent_path)).samefile( dbgym_workspace.dbgym_runs_path - ), f"task_run_fordpath ({task_run_fordpath}) should be inside a run_*/[codebase]/[organization]/ dir instead of directly in run_*/ ({dbgym_workspace.dbgym_runs_path})" - # org_dpath is the run_*/[codebase]/[organization]/ dir that task_run_fordpath is in - org_dpath = parent_dpath - while not parent_dpath_of_path( - parent_dpath_of_path(parent_dpath_of_path(org_dpath)) + ), f"task_run_path ({task_run_path}) should be inside a run_*/[codebase]/[organization]/ dir instead of directly in run_*/ ({dbgym_workspace.dbgym_runs_path})" + # org_path is the run_*/[codebase]/[organization]/ dir that task_run_path is in + org_path = parent_path + while not parent_path_of_path( + parent_path_of_path(parent_path_of_path(org_path)) ).samefile(dbgym_workspace.dbgym_runs_path): - org_dpath = parent_dpath_of_path(org_dpath) - org_dname = basename_of_path(org_dpath) - codebase_dpath = parent_dpath_of_path(org_dpath) - codebase_dname = basename_of_path(codebase_dpath) + org_path = parent_path_of_path(org_path) + org_dname = basename_of_path(org_path) + codebase_path = parent_path_of_path(org_path) + codebase_dname = basename_of_path(codebase_path) - return codebase_dpath, codebase_dname, org_dpath, org_dname - - -# TODO(phw2): deprecate this once I'm done with unittest_workspace.py -def save_file(dbgym_workspace: DBGymWorkspace, fpath: Path) -> None: - """ - If an external function takes in a file/directory as input, you will not be able to call open_and_save(). - In these situations, just call save_file(). - Like open_and_save(), this function only works with real absolute paths. - "Saving" can mean either copying the file or creating a symlink to it - We copy the file if it is a "config", meaning it just exists without having been generated - We create a symlink if it is a "dependency", meaning a task.py command was run to generate it - In these cases we create a symlink so we have full provenance for how the dependency was created - """ - # validate fpath - assert is_fully_resolved(fpath), f"fpath ({fpath}) should be a fully resolved path" - assert os.path.isfile(fpath), f"fpath ({fpath}) should be a file" - assert not is_child_path( - fpath, dbgym_workspace.dbgym_this_run_path - ), f"fpath ({fpath}) was generated in this task run ({dbgym_workspace.dbgym_this_run_path}). You do not need to save it" - - # save _something_ to dbgym_this_run_path - # save a symlink if the opened file was generated by a run. this is for two reasons: - # 1. files or dirs generated by a run are supposed to be immutable so saving a symlink is safe - # 2. files or dirs generated by a run may be very large (up to 100s of GBs) so we don't want to copy them - if is_child_path(fpath, dbgym_workspace.dbgym_runs_path): - # get paths we'll need later. - _, codebase_dname, org_dpath, org_dname = extract_from_task_run_fordpath( - dbgym_workspace, fpath - ) - this_run_save_dpath = ( - dbgym_workspace.dbgym_this_run_path / codebase_dname / org_dname - ) - os.makedirs(this_run_save_dpath, exist_ok=True) - - # if the fpath file is directly in org_dpath, we symlink the file directly - parent_dpath = parent_dpath_of_path(fpath) - if parent_dpath.samefile(org_dpath): - fname = basename_of_path(fpath) - symlink_fpath = this_run_save_dpath / (fname + ".link") - try_create_symlink(fpath, symlink_fpath) - # else, we know the fpath file is _not_ directly inside org_dpath dir - # we go as far back as we can while still staying in org_dpath and symlink that "base" dir - # this is because lots of runs create dirs within org_dpath and it's just a waste of space to symlink every individual file - else: - # set base_dpath such that its parent is org_dpath - base_dpath = parent_dpath - while not parent_dpath_of_path(base_dpath).samefile(org_dpath): - base_dpath = parent_dpath_of_path(base_dpath) - - # create symlink - open_base_dname = basename_of_path(base_dpath) - symlink_dpath = this_run_save_dpath / (open_base_dname + ".link") - try_create_symlink(base_dpath, symlink_dpath) - # if it wasn't generated by a run - else: - # since we don't know where the file is at all, the location is "unknown" and the org is "all" - this_run_save_dpath = dbgym_workspace.dbgym_this_run_path / "unknown" / "all" - os.makedirs(this_run_save_dpath, exist_ok=True) - fname = basename_of_path(fpath) - # in this case, we want to copy instead of symlinking since it might disappear in the future - copy_fpath = this_run_save_dpath / fname - shutil.copy(fpath, copy_fpath) - - -# TODO(phw2): deprecate this once I'm done with unittest_workspace.py -def link_result( - dbgym_workspace: DBGymWorkspace, - result_fordpath: Path, - custom_result_name: Optional[str] = None, -) -> Path: - """ - result_fordpath must be a "result", meaning it was generated inside dbgym_workspace.dbgym_this_run_path. - Further, result_fordpath must have been generated by this invocation to task.py. This also means that - result_fordpath itself can be a file or a dir but not a symlink. - Given a file or directory in task_runs/run_*/[codebase]/[org], this will create a symlink inside - symlinks/[codebase]/[org]/. - Will override the old symlink if there is one, so that symlinks/ always contains the latest generated - version of a file. - This function will return the path to the symlink that was created. - """ - assert isinstance(result_fordpath, Path) - assert is_fully_resolved( - result_fordpath - ), f"result_fordpath ({result_fordpath}) should be a fully resolved path" - assert is_child_path(result_fordpath, dbgym_workspace.dbgym_this_run_path) - assert not os.path.islink(result_fordpath) - - if type(custom_result_name) is str: - result_name = custom_result_name - else: - if os.path.isfile(result_fordpath): - result_name = basename_of_path(result_fordpath) + ".link" - elif os.path.isdir(result_fordpath): - result_name = basename_of_path(result_fordpath) + ".link" - else: - raise AssertionError("result_fordpath must be either a file or dir") - - # Figure out the parent directory path of the symlink - codebase_dpath, codebase_dname, _, org_dname = extract_from_task_run_fordpath( - dbgym_workspace, result_fordpath - ) - # We're only supposed to save files generated by us, which means they should be in cur_task_runs_path() - assert codebase_dpath.samefile( - dbgym_workspace.cur_task_runs_path() - ), f"link_result should only be called on files generated by this invocation to task.py" - symlink_parent_dpath = ( - dbgym_workspace.dbgym_symlinks_path / codebase_dname / org_dname - ) - symlink_parent_dpath.mkdir(parents=True, exist_ok=True) - - # Remove the old symlink ("old" meaning created in an earlier run) if there is one - # Note that in a multi-threaded setting, this might remove one created by a process in the same run, - # meaning it's not "old" by our definition of "old". However, we'll always end up with a symlink - # file of the current run regardless of the order of threads. - assert result_name.endswith(".link") and not result_name.endswith( - ".link.link" - ), f'result_name ({result_name}) should end with ".link"' - symlink_path = symlink_parent_dpath / result_name - try_remove_file(symlink_path) - try_create_symlink(result_fordpath, symlink_path) - - return symlink_path + return codebase_path, codebase_dname, org_path, org_dname def try_create_symlink(src_path: Path, dst_path: Path) -> None: @@ -744,7 +464,7 @@ def try_create_symlink(src_path: Path, dst_path: Path) -> None: Our functions that create symlinks might be called by multiple processes at once during HPO. Thus, this is a thread-safe way to create a symlink. """ - assert dst_path.suffix == ".link" + assert is_linkname(dst_path.name) try: os.symlink(src_path, dst_path) except FileExistsError: @@ -764,43 +484,6 @@ def try_remove_file(path: Path) -> None: pass -# TODO: move this stuff to shell.py -def restart_ray(redis_port: int) -> None: - """ - Stop and start Ray. - This is good to do between each stage to avoid bugs from carrying over across stages - """ - subprocess_run("ray stop -f") - ncpu = os.cpu_count() - # --disable-usage-stats avoids a Y/N prompt - subprocess_run( - f"OMP_NUM_THREADS={ncpu} ray start --head --port={redis_port} --num-cpus={ncpu} --disable-usage-stats" - ) - - -def make_redis_started(port: int) -> None: - """ - Start Redis if it's not already started. - Note that Ray uses Redis but does *not* use this function. It starts Redis on its own. - One current use for this function to start/stop Redis for Boot. - """ - try: - r = redis.Redis(port=port) - r.ping() - # This means Redis is running, so we do nothing - do_start_redis = False - except (redis.ConnectionError, redis.TimeoutError): - # This means Redis is not running, so we start it - do_start_redis = True - - # I'm starting Redis outside of except so that errors in r.ping get propagated correctly - if do_start_redis: - subprocess_run(f"redis-server --port {port} --daemonize yes") - # When you start Redis in daemon mode, it won't let you know if it's started, so we ping again to check - r = redis.Redis(port=port) - r.ping() - - def is_ssd(path: Path) -> bool: try: device = (