Skip to content

Commit

Permalink
Fully using new workspace (#67)
Browse files Browse the repository at this point in the history
**Summary**: The codebase is now fully refactored to use the new
`open_and_save()`, `save_file()`, and `link_result()` functions from
#66. The old functions have been
removed.

**Demo**:
Wrote two new integration test files that pass.
<img width="905" alt="Screenshot 2024-12-30 at 10 45 53"
src="https://github.com/user-attachments/assets/ca197fe1-1a5b-4255-9ec7-515c7687caf1"
/>
<img width="1076" alt="Screenshot 2024-12-30 at 10 50 10"
src="https://github.com/user-attachments/assets/a621b5eb-4619-4e27-a2a7-85c8a8ff501c"
/>


**Details**
* Refactored `dbms/` and `benchmark/` to use the new files. Wrote tests
for both too.
* Moved path functions from `workspace.py` to
`gymlib/symlinks_paths.py`.
* This is important because agents will need access to the
DBMS/benchmark paths.
* Renamed all occurrences of `dpath/fpath/fordpath` to `path`. Renamed
`dname/fname` to `dirname/filename`.
* Names could refer to conceptual names so we add `dir/file` to the
front to disambiguate.
* Paths are not ambiguous though so we just call them paths. Whether
they're a directory or file matters a little, but I think it's cleaner
to just call them paths since that's the general standard.
  • Loading branch information
wangpatrick57 authored Dec 30, 2024
1 parent 8511688 commit 34a47c0
Show file tree
Hide file tree
Showing 37 changed files with 1,006 additions and 985 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
# Integration tests do require external systems to be running (most commonly a database instance).
# Unlike end-to-end tests though, they test a specific module in a detailed manner, much like a unit test does.
env:
# We set `INTENDED_DBDATA_HARDWARE` so that it's seen when `integtest_pg_conn.py` executes `_set_up_gymlib_integtest_workspace.sh`.
# The CI runs on ssd so we have to set this.
INTENDED_DBDATA_HARDWARE: ssd
run: |
. "$HOME/.cargo/env"
Expand Down
2 changes: 1 addition & 1 deletion benchmark/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
@click.group(name="benchmark")
@click.pass_obj
def benchmark_group(dbgym_workspace: DBGymWorkspace) -> None:
dbgym_workspace.append_group("benchmark")
pass


benchmark_group.add_command(tpch_group)
Expand Down
125 changes: 74 additions & 51 deletions benchmark/job/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
from typing import Optional

import click
from gymlib.symlinks_paths import (
get_tables_dirname,
get_workload_dirname,
get_workload_suffix,
name_to_linkname,
)

from benchmark.constants import DEFAULT_SCALE_FACTOR
from util.log import DBGYM_LOGGER_NAME
from util.shell import subprocess_run
from util.workspace import (
DBGymWorkspace,
get_default_tables_dname,
get_workload_name,
is_fully_resolved,
link_result,
)
from util.workspace import DBGymWorkspace, fully_resolve_path

JOB_TABLES_URL = "https://event.cwi.nl/da/job/imdb.tgz"
JOB_QUERIES_URL = "https://event.cwi.nl/da/job/job.tgz"
Expand Down Expand Up @@ -137,18 +137,22 @@
@click.group(name="job")
@click.pass_obj
def job_group(dbgym_workspace: DBGymWorkspace) -> None:
dbgym_workspace.append_group("job")
pass


@job_group.command(name="data")
@job_group.command(name="tables")
# We expose this option to keep its interface consistent with other workloads, but you should never pass in something other than DEFAULT_SCALE_FACTOR.
@click.argument("scale-factor", type=float)
@click.pass_obj
# The reason generate data is separate from create dbdata is because generate-data is generic
# The reason generate data is separate from create dbdata is because generate data is generic
# to all DBMSs while create dbdata is specific to a single DBMS.
def job_data(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None:
def job_tables(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None:
_job_tables(dbgym_workspace, scale_factor)


def _job_tables(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None:
assert scale_factor == DEFAULT_SCALE_FACTOR
_download_job_data(dbgym_workspace)
_download_job_tables(dbgym_workspace)


@job_group.command(name="workload")
Expand All @@ -161,18 +165,24 @@ def job_data(dbgym_workspace: DBGymWorkspace, scale_factor: float) -> None:
@click.pass_obj
def job_workload(
dbgym_workspace: DBGymWorkspace, query_subset: str, scale_factor: float
) -> None:
_job_workload(dbgym_workspace, query_subset, scale_factor)


def _job_workload(
dbgym_workspace: DBGymWorkspace, query_subset: str, scale_factor: float
) -> None:
assert scale_factor == DEFAULT_SCALE_FACTOR
_download_job_queries(dbgym_workspace)
_generate_job_workload(dbgym_workspace, query_subset)


def _download_job_data(dbgym_workspace: DBGymWorkspace) -> None:
def _download_job_tables(dbgym_workspace: DBGymWorkspace) -> None:
_download_and_untar_dir(
dbgym_workspace,
JOB_TABLES_URL,
"imdb.tgz",
get_default_tables_dname(DEFAULT_SCALE_FACTOR),
get_tables_dirname("job", DEFAULT_SCALE_FACTOR),
)


Expand All @@ -199,51 +209,66 @@ def _download_and_untar_dir(
an "original" directory name. If this is the case, you should set
`untarred_original_dname` to ensure that it gets renamed to `untarred_dname`.
"""
expected_symlink_dpath = (
dbgym_workspace.cur_symlinks_data_path(mkdir=True) / f"{untarred_dname}.link"
expected_symlink_path = (
dbgym_workspace.dbgym_cur_symlinks_path / f"{untarred_dname}.link"
)
if expected_symlink_dpath.exists():
if expected_symlink_path.exists():
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Skipping download: {expected_symlink_dpath}"
f"Skipping download: {expected_symlink_path}"
)
return

logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloading: {expected_symlink_dpath}")
real_data_path = dbgym_workspace.cur_task_runs_data_path(mkdir=True)
subprocess_run(f"curl -O {download_url}", cwd=real_data_path)
untarred_data_dpath = dbgym_workspace.cur_task_runs_data_path(untarred_dname)
logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloading: {expected_symlink_path}")
subprocess_run(f"curl -O {download_url}", cwd=dbgym_workspace.dbgym_this_run_path)
untarred_data_path = dbgym_workspace.dbgym_this_run_path / untarred_dname

if untarred_original_dname is not None:
assert not untarred_data_dpath.exists()
subprocess_run(f"tar -zxvf {download_tarred_fname}", cwd=real_data_path)
assert (real_data_path / untarred_original_dname).exists()
assert not untarred_data_path.exists()
subprocess_run(
f"tar -zxvf {download_tarred_fname}",
cwd=dbgym_workspace.dbgym_this_run_path,
)
assert (dbgym_workspace.dbgym_this_run_path / untarred_original_dname).exists()
subprocess_run(
f"mv {untarred_original_dname} {untarred_dname}", cwd=real_data_path
f"mv {untarred_original_dname} {untarred_dname}",
cwd=dbgym_workspace.dbgym_this_run_path,
)
else:
untarred_data_dpath.mkdir(parents=True, exist_ok=False)
subprocess_run(f"tar -zxvf ../{download_tarred_fname}", cwd=untarred_data_dpath)
untarred_data_path.mkdir(parents=True, exist_ok=False)
subprocess_run(f"tar -zxvf ../{download_tarred_fname}", cwd=untarred_data_path)

assert untarred_data_dpath.exists()
subprocess_run(f"rm {download_tarred_fname}", cwd=real_data_path)
symlink_dpath = link_result(dbgym_workspace, untarred_data_dpath)
assert expected_symlink_dpath.samefile(symlink_dpath)
logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloaded: {expected_symlink_dpath}")
assert untarred_data_path.exists()
subprocess_run(
f"rm {download_tarred_fname}", cwd=dbgym_workspace.dbgym_this_run_path
)
symlink_path = dbgym_workspace.link_result(untarred_data_path)
assert expected_symlink_path.samefile(symlink_path)
logging.getLogger(DBGYM_LOGGER_NAME).info(f"Downloaded: {expected_symlink_path}")


def _generate_job_workload(
dbgym_workspace: DBGymWorkspace,
query_subset: str,
) -> None:
workload_name = get_workload_name(DEFAULT_SCALE_FACTOR, query_subset)
expected_workload_symlink_dpath = dbgym_workspace.cur_symlinks_data_path(
mkdir=True
) / (workload_name + ".link")
workload_name = get_workload_dirname(
"job",
DEFAULT_SCALE_FACTOR,
get_workload_suffix("job", query_subset=query_subset),
)
expected_workload_symlink_path = dbgym_workspace.dbgym_cur_symlinks_path / (
name_to_linkname(workload_name)
)
if expected_workload_symlink_path.exists():
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Skipping generation: {expected_workload_symlink_path}"
)
return

logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Generating: {expected_workload_symlink_dpath}"
f"Generating: {expected_workload_symlink_path}"
)
real_dpath = dbgym_workspace.cur_task_runs_data_path(workload_name, mkdir=True)
workload_path = dbgym_workspace.dbgym_this_run_path / workload_name
workload_path.mkdir(parents=False, exist_ok=False)

query_names = None
if query_subset == "all":
Expand All @@ -255,19 +280,17 @@ def _generate_job_workload(
else:
assert False

with open(real_dpath / "order.txt", "w") as f:
with open(workload_path / "order.txt", "w") as f:
queries_parent_path = dbgym_workspace.dbgym_cur_symlinks_path / (
name_to_linkname(JOB_QUERIES_DNAME)
)

for qname in query_names:
sql_fpath = (
dbgym_workspace.cur_symlinks_data_path(mkdir=True)
/ (f"{JOB_QUERIES_DNAME}.link")
).resolve() / f"{qname}.sql"
assert is_fully_resolved(
sql_fpath
), "We should only write existent real absolute paths to a file"
f.write(f"Q{qname},{sql_fpath}\n")
sql_path = fully_resolve_path(queries_parent_path / f"{qname}.sql")
f.write(f"Q{qname},{sql_path}\n")

workload_symlink_dpath = link_result(dbgym_workspace, real_dpath)
assert workload_symlink_dpath == expected_workload_symlink_dpath
workload_symlink_path = dbgym_workspace.link_result(workload_path)
assert workload_symlink_path == expected_workload_symlink_path
logging.getLogger(DBGYM_LOGGER_NAME).info(
f"Generated: {expected_workload_symlink_dpath}"
f"Generated: {expected_workload_symlink_path}"
)
49 changes: 20 additions & 29 deletions benchmark/job/load_info.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from pathlib import Path
from typing import Optional

from gymlib.symlinks_paths import get_tables_symlink_path

from benchmark.constants import DEFAULT_SCALE_FACTOR
from dbms.load_info_base_class import LoadInfoBaseClass
from util.workspace import DBGymWorkspace, get_default_tables_dname, is_fully_resolved
from util.workspace import DBGymWorkspace, fully_resolve_path

JOB_SCHEMA_FNAME = "job_schema.sql"


class JobLoadInfo(LoadInfoBaseClass):
CODEBASE_PATH_COMPONENTS = ["dbgym", "benchmark", "job"]
CODEBASE_DNAME = "_".join(CODEBASE_PATH_COMPONENTS)
TABLES = [
"aka_name",
"aka_title",
Expand All @@ -36,43 +36,34 @@ class JobLoadInfo(LoadInfoBaseClass):
]

def __init__(self, dbgym_workspace: DBGymWorkspace):
# schema and constraints
schema_root_dpath = dbgym_workspace.base_dbgym_repo_dpath
for component in JobLoadInfo.CODEBASE_PATH_COMPONENTS[
1:
]: # [1:] to skip "dbgym"
schema_root_dpath /= component
self._schema_fpath = schema_root_dpath / JOB_SCHEMA_FNAME
# Schema (directly in the codebase).
job_codebase_path = dbgym_workspace.base_dbgym_repo_path / "benchmark" / "job"
self._schema_path = job_codebase_path / JOB_SCHEMA_FNAME
assert (
self._schema_fpath.exists()
), f"self._schema_fpath ({self._schema_fpath}) does not exist"
self._schema_path.exists()
), f"self._schema_path ({self._schema_path}) does not exist"

# Tables
data_root_dpath = (
dbgym_workspace.dbgym_symlinks_path / JobLoadInfo.CODEBASE_DNAME / "data"
)
tables_symlink_dpath = (
data_root_dpath / f"{get_default_tables_dname(DEFAULT_SCALE_FACTOR)}.link"
tables_path = fully_resolve_path(
get_tables_symlink_path(
dbgym_workspace.dbgym_workspace_path, "job", DEFAULT_SCALE_FACTOR
)
)
tables_dpath = tables_symlink_dpath.resolve()
assert is_fully_resolved(
tables_dpath
), f"tables_dpath ({tables_dpath}) should be an existent real absolute path. Make sure you have generated the TPC-H data"
self._tables_and_fpaths = []
self._tables_and_paths = []
for table in JobLoadInfo.TABLES:
table_fpath = tables_dpath / f"{table}.csv"
self._tables_and_fpaths.append((table, table_fpath))
table_path = tables_path / f"{table}.csv"
self._tables_and_paths.append((table, table_path))

def get_schema_fpath(self) -> Path:
return self._schema_fpath
def get_schema_path(self) -> Path:
return self._schema_path

def get_tables_and_fpaths(self) -> list[tuple[str, Path]]:
return self._tables_and_fpaths
def get_tables_and_paths(self) -> list[tuple[str, Path]]:
return self._tables_and_paths

def get_table_file_delimiter(self) -> str:
return ","

def get_constraints_fpath(self) -> Optional[Path]:
def get_constraints_path(self) -> Optional[Path]:
# JOB does not have any constraints. It does have indexes, but we don't want to create
# those indexes so that the tuning agent can start from a clean slate.
return None
Empty file added benchmark/tests/__init__.py
Empty file.
1 change: 1 addition & 0 deletions benchmark/tests/benchmark_integtest_dbgym_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dbgym_workspace_path: ../dbgym_benchmark_integtest_workspace/
Loading

0 comments on commit 34a47c0

Please sign in to comment.