Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Update experiment configs and bugfixes #631

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
Update configs
robinholzi committed Sep 15, 2024
commit 0f9c6025abd34a60e35183a23586922663955627
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ def gen_pipeline_config(
dataloader_workers=1,
use_previous_model=True,
initial_model="random",
batch_size=256,
batch_size=128, # gpu memory limit does't allow for larger batch sizes
shuffle=True,
optimizers=[
OptimizerConfig(
30 changes: 17 additions & 13 deletions experiments/arxiv/compare_trigger_policies/run.py
Original file line number Diff line number Diff line change
@@ -67,13 +67,15 @@ def construct_periodic_eval_handlers(


def construct_between_trigger_eval_handler(execution_time: EvalHandlerExecutionTime = "manual") -> EvalHandlerConfig:
return EvalHandlerConfig(
name="full",
execution_time=execution_time,
models="active",
strategy=BetweenTwoTriggersEvalStrategyConfig(),
datasets=["arxiv_kaggle_all"], # train and test
)
return [
EvalHandlerConfig(
name="full",
execution_time=execution_time,
models="active",
strategy=BetweenTwoTriggersEvalStrategyConfig(),
datasets=["arxiv_kaggle_all"], # train and test
)
]


def construct_pipelines(experiment: Experiment) -> list[ModynPipelineConfig]:
@@ -103,8 +105,6 @@ def construct_pipelines(experiment: Experiment) -> list[ModynPipelineConfig]:
# 1X: Baselines with PERIODIC_EVAL_INTERVAL, executed with cautious #
# parallelism and post factum evaluation (bottlenecking) #
# -------------------------------------------------------------------------------- #
# TODO: merge main
# TODO: reset datasets in db
# time baselines
10: Experiment(
name="arxiv-baseline-time",
@@ -114,9 +114,11 @@ def construct_pipelines(experiment: Experiment) -> list[ModynPipelineConfig]:
),
time_triggers={
schedule: TimeTriggerConfig(every=schedule, start_timestamp=_FIRST_TIMESTAMP)
for schedule in reversed(["52w", "2y", "5y", "10y"]) # TODO: add 1y
for schedule in reversed(["26w", "10y"])
# 0: "1y", "2y", "5y"
# 1: "26w", "10y"
},
gpu_device="cuda:0",
gpu_device="cuda:2",
),
# data amount baselines
11: Experiment(
@@ -127,9 +129,11 @@ def construct_pipelines(experiment: Experiment) -> list[ModynPipelineConfig]:
),
data_amount_triggers={
f"{num_samples}": DataAmountTriggerConfig(num_samples=num_samples)
for num_samples in reversed([50_000, 100_000, 500_000]) # TODO: add 25_000
for num_samples in reversed([25_000, 50_000])
# 2: 100_000, 500_000, 1_000_000
# 3: 25_000, 50_000
},
gpu_device="cuda:1",
gpu_device="cuda:3",
),
# -------------------------------------------------------------------------------- #
# 2X: Drift triggers #
17 changes: 5 additions & 12 deletions experiments/huffpost/compare_trigger_policies/pipeline_config.py
Original file line number Diff line number Diff line change
@@ -51,22 +51,15 @@ def gen_pipeline_config(
dataloader_workers=1,
use_previous_model=True,
initial_model="random",
batch_size=256,
batch_size=128, # gpu memory limit does't allow for larger batch sizes
shuffle=True,
optimizers=[
OptimizerConfig(
name="default",
algorithm="SGD",
algorithm="AdamW",
source="PyTorch",
param_groups=[
OptimizerParamGroup(
name="default",
algorithm="AdamW",
source="PyTorch",
param_groups=[
OptimizerParamGroup(module="model", config={"lr": 0.00002, "weight_decay": 0.01})
],
)
OptimizerParamGroup(module="model", config={"lr": 0.00002, "weight_decay": 0.01})
],
)
],
@@ -88,8 +81,8 @@ def gen_pipeline_config(
evaluation=EvaluationConfig(
handlers=eval_handlers,
device=gpu_device,
after_training_evaluation_workers=8,
after_pipeline_evaluation_workers=8,
after_training_evaluation_workers=2, # one worker needs 8-9 GB of memory
after_pipeline_evaluation_workers=2,
datasets=[
EvalDataConfig(
dataset_id=hp_dataset_name,
30 changes: 21 additions & 9 deletions modyn/supervisor/internal/pipeline_executor/evaluation_executor.py
Original file line number Diff line number Diff line change
@@ -176,9 +176,15 @@ def run_pipeline_evaluations(
logs = self._launch_evaluations_async(eval_requests, log, num_workers)
return logs

def run_post_pipeline_evaluations(self, manual_run: bool = False) -> SupervisorLogs:
def run_post_pipeline_evaluations(self, manual_run: bool = False, num_workers: int | None = None) -> SupervisorLogs:
"""Evaluate the trained models after the core pipeline and store the
results."""
results.

Args:
manual_run: If True, only the evaluations that are marked as manual will be executed.
num_workers: The number of workers to use for the evaluations. If None, the number of workers will be
determined by the pipeline configuration.
"""
if not self.pipeline.evaluation:
return SupervisorLogs(stage_runs=[])

@@ -224,7 +230,9 @@ def run_post_pipeline_evaluations(self, manual_run: bool = False) -> SupervisorL
sample_time=-1,
trigger_idx=-1,
),
num_workers=self.pipeline.evaluation.after_pipeline_evaluation_workers,
num_workers=(
num_workers if num_workers else self.pipeline.evaluation.after_pipeline_evaluation_workers
)
)
return logs

@@ -420,17 +428,17 @@ def get_failure_reason(eval_aborted_reason: EvaluationAbortedReason) -> str:
# ------------------------------------------------------------------------------------ #


def eval_executor_single_pipeline(pipeline_dir: Path) -> SupervisorLogs:
def eval_executor_single_pipeline(pipeline_dir: Path, num_workers: int) -> SupervisorLogs:
# restart evaluation executor
ex = EvaluationExecutor.init_from_path(pipeline_dir)

supervisor_eval_logs = ex.run_post_pipeline_evaluations(manual_run=True)
supervisor_eval_logs = ex.run_post_pipeline_evaluations(manual_run=True, num_workers=num_workers)
logger.info("Done with manual evaluation.")

return supervisor_eval_logs


def eval_executor_multi_pipeline(pipelines_dir: Path, pids: list[int] | None = None) -> None:
def eval_executor_multi_pipeline(pipelines_dir: Path, num_workers: int, pids: list[int] | None = None) -> None:
"""Run the evaluation executor for multiple pipelines."""
faulty_dir = pipelines_dir / "_faulty"
done_dir = pipelines_dir / "_done"
@@ -454,7 +462,7 @@ def eval_executor_multi_pipeline(pipelines_dir: Path, pids: list[int] | None = N

(finished_dir / p_dir.stem).mkdir(exist_ok=True)

supervisor_eval_logs = eval_executor_single_pipeline(p_dir)
supervisor_eval_logs = eval_executor_single_pipeline(p_dir, num_workers=num_workers)

shutil.copytree(p_dir, done_dir / p_dir.stem, dirs_exist_ok=True)
full_logs = PipelineLogs.model_validate_json((done_dir / p_dir.stem / "pipeline.log").read_text())
@@ -473,11 +481,15 @@ def eval_executor_multi_pipeline(pipelines_dir: Path, pids: list[int] | None = N
print("Path not found")
sys.exit(1)

num_workers = int(input("Enter number of workers (<= 0 will use the pipeline default): "))
if num_workers <= 0:
num_workers = None

if single_pipeline_mode.lower() == "y":
p_id = int(input("Enter pipeline id: "))
eval_executor_multi_pipeline(userpath, [p_id])
eval_executor_multi_pipeline(userpath, num_workers=num_workers, pids=[p_id])
elif single_pipeline_mode.lower() == "n":
eval_executor_multi_pipeline(userpath)
eval_executor_multi_pipeline(userpath, num_workers=num_workers)
else:
print("Invalid input")
sys.exit(1)