Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintenance/checkpointing #130

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
255fa35
removed pickle based checkpointing
May 9, 2024
ad2edaa
basic checkpointing for just the propulator
May 9, 2024
849b126
added limits member to the propagator at the top of the hierarchy as …
May 11, 2024
a91bfb1
added limits member to the cmaes propagator
May 11, 2024
620f5da
updated the signature of the propagator in the propagator factory
May 11, 2024
804ee71
updated logging for tests and added pso checkpointing test
May 12, 2024
0e448fb
fixed logging output during tests and added a surrogate checkpointing…
May 12, 2024
4dd75d9
the checkpoint test also resumes the run from the checkpoint
May 12, 2024
63149d0
added hdf5 checkpointing, tested only for single sequential worker
May 12, 2024
4bf06bb
removed debug print
May 12, 2024
695d7f8
added checkpointing to island models
May 12, 2024
2a911c5
added some more asserts and mode verbose output
May 14, 2024
c914c98
fixed checkpoint file communicator
May 14, 2024
03bb0f0
fixed off by one error in generations when loading checkpoint, remove…
May 14, 2024
2070a57
removed comment and debug output
May 14, 2024
cdc3e4d
comments, generations, debugging stuff
May 15, 2024
acd9de8
removed some debug messages and added migration updates to the checkp…
May 15, 2024
e30b2f5
fixed comm
May 15, 2024
d516b3f
fixed mistake in checkpoint reading, fixed comm, used individual isla…
May 15, 2024
b9056e4
fixed pre-commit issues, updated installation instructions, excluded …
oskar-taubert Aug 4, 2024
03289f3
fixed ruff errors, that were not found in the pre-commit hook for som…
oskar-taubert Aug 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ repos:
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
exclude: 'coverage'
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/astral-sh/ruff-pre-commit
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ discussions](https://github.com/Helmholtz-AI-Energy/propulate/discussions) :octo

## Installation

- Installations of MPI and hdf5 are required.
- ``Propulate`` depends on parallel hdf5. Try ``CC="mpicc" HDF5_MPI="ON" pip install --no-binary=h5py h5py`` to install the parallel enabled Python bindings or follow instructions [here](https://docs.h5py.org/en/stable/build.html#install).
- You can install the **latest stable release** from PyPI: ``pip install propulate``
- If you need the **latest updates**, you can also install ``Propulate`` directly from the master branch.
Pull and run ``pip install .``.
Expand Down
1 change: 0 additions & 1 deletion propulate/_globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@
SYNCHRONIZATION_TAG = 2
INIT_TAG = 3
POPULATION_TAG = 4
DUMP_TAG = 5
MIGRATION_TAG = 6
88 changes: 44 additions & 44 deletions propulate/migrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
from typing import Callable, Generator, List, Optional, Type, Union

import h5py
import numpy as np
from mpi4py import MPI

Expand Down Expand Up @@ -123,7 +124,7 @@ def __init__(
Individual
] = [] # Emigrated individuals to be deactivated on sending island

def _send_emigrants(self) -> None:
def _send_emigrants(self, hdf5_checkpoint: h5py.File) -> None:
"""Perform migration, i.e. island sends individuals out to other islands."""
log_string = (
f"Island {self.island_idx} Worker {self.island_comm.rank} "
Expand Down Expand Up @@ -188,6 +189,10 @@ def _send_emigrants(self) -> None:

# Send emigrants to target island.
departing = copy.deepcopy(emigrants)
for ind in departing:
hdf5_checkpoint[f"{ind.island}"][f"{ind.island_rank}"][
"active_on_island"
][ind.generation, self.island_idx] = False
# Determine new responsible worker on target island.
for ind in departing:
ind.current = self.rng.randrange(0, count)
Expand Down Expand Up @@ -236,7 +241,7 @@ def _send_emigrants(self) -> None:
f"to select {num_emigrants} migrants."
)

def _receive_immigrants(self) -> None:
def _receive_immigrants(self, hdf5_checkpoint: h5py.File) -> None:
"""
Check for and possibly receive immigrants send by other islands.

Expand Down Expand Up @@ -284,6 +289,9 @@ def _receive_immigrants(self) -> None:
log_string
+ f"Identical immigrant {immigrant} already active on target island {self.island_idx}."
)
hdf5_checkpoint[f"{immigrant.island}"][f"{immigrant.island_rank}"][
"active_on_island"
][immigrant.generation] = True
self.population.append(
copy.deepcopy(immigrant)
) # Append immigrant to population.
Expand Down Expand Up @@ -421,53 +429,51 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None:
if self.propulate_comm is None:
while self.generations <= -1 or self.generation < self.generations:
# Breed and evaluate individual.
self._evaluate_individual()
# TODO this should be refactored, the subworkers don't need the logfile
# TODO this needs to be addressed before merge, since multirank workers should fail with this
self._evaluate_individual(None)
self.generation += 1
return

if self.island_comm.rank == 0:
log.info(f"Island {self.island_idx} has {self.island_comm.size} workers.")

dump = True if self.island_comm.rank == 0 else False
migration = True if self.migration_prob > 0 else False
self.propulate_comm.barrier()

# Loop over generations.
while self.generations <= -1 or self.generation < self.generations:
if self.generation % int(logging_interval) == 0:
log.info(
f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..."
)

# Breed and evaluate individual.
self._evaluate_individual()

# Check for and possibly receive incoming individuals from other intra-island workers.
self._receive_intra_island_individuals()
# TODO this should probably be refactored, checkpointing can probably be handled in one place
with h5py.File(
self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm
) as f:
while self.generation < self.generations:
if self.generation % int(logging_interval) == 0:
log.info(
f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..."
)

# Migration.
if migration:
# Emigration: Island sends individuals out.
# Happens on per-worker basis with certain probability.
if self.rng.random() < self.migration_prob:
self._send_emigrants()
# Breed and evaluate individual.
self._evaluate_individual(f)

# Immigration: Check for incoming individuals from other islands.
self._receive_immigrants()
# Check for and possibly receive incoming individuals from other intra-island workers.
self._receive_intra_island_individuals()

# Emigration: Check for emigrants from other intra-island workers to be deactivated.
self._deactivate_emigrants()
if debug == 2:
check = self._check_emigrants_to_deactivate()
assert check is False
# Migration.
if migration:
# Emigration: Island sends individuals out.
# Happens on per-worker basis with certain probability.
if self.rng.random() < self.migration_prob:
self._send_emigrants(f)

if dump: # Dump checkpoint.
self._dump_checkpoint()
# Immigration: Check for incoming individuals from other islands.
self._receive_immigrants(f)

dump = (
self._determine_worker_dumping_next()
) # Determine worker dumping checkpoint in the next generation.
self.generation += 1 # Go to next generation.
# Emigration: Check for emigrants from other intra-island workers to be deactivated.
self._deactivate_emigrants()
if debug == 2:
check = self._check_emigrants_to_deactivate()
assert check is False
self.generation += 1 # Go to next generation.

# Having completed all generations, the workers have to wait for each other.
# Once all workers are done, they should check for incoming messages once again
Expand All @@ -486,7 +492,10 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None:

if migration:
# Final check for incoming individuals from other islands.
self._receive_immigrants()
with h5py.File(
self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm
) as f:
self._receive_immigrants(f)
self.propulate_comm.barrier()

# Emigration: Final check for emigrants from other intra-island workers to be deactivated.
Expand All @@ -507,12 +516,3 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None:
raise ValueError(
"There should not be any individuals left that need to be deactivated."
)

self.propulate_comm.barrier()

# Final checkpointing on rank 0.
if self.island_comm.rank == 0:
self._dump_final_checkpoint() # Dump checkpoint.
self.propulate_comm.barrier()
_ = self._determine_worker_dumping_next()
self.propulate_comm.barrier()
71 changes: 34 additions & 37 deletions propulate/pollinator.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import copy
import logging
import random
import time
from pathlib import Path
from typing import Callable, Generator, List, Optional, Tuple, Type, Union

import h5py
import numpy as np
from mpi4py import MPI

Expand Down Expand Up @@ -204,7 +206,8 @@ def _send_emigrants(self) -> None:
f"to select {num_emigrants} migrants."
)

def _receive_immigrants(self) -> None:
# TODO implement checkpoint update
def _receive_immigrants(self, hdf5_checkpoint: h5py.File) -> None:
"""Check for and possibly receive immigrants send by other islands."""
replace_num = 0
log_string = (
Expand Down Expand Up @@ -400,53 +403,54 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None:
debug : int, optional
The debug level; 0 - silent; 1 - moderate, 2 - noisy (debug mode). Default is 1.
"""
self.start_time = time.time_ns()
if self.worker_sub_comm != MPI.COMM_SELF:
self.generation = self.worker_sub_comm.bcast(self.generation, root=0)
if self.propulate_comm is None:
while self.generations <= -1 or self.generation < self.generations:
# Breed and evaluate individual.
self._evaluate_individual()
# TODO this should be refactored, the subworkers don't need the logfile
# TODO this needs to be addressed before merge, since multirank workers should fail with this
self._evaluate_individual(None)
self.generation += 1
return
if self.island_comm.rank == 0:
log.info(f"Island {self.island_idx} has {self.island_comm.size} workers.")

dump = True if self.island_comm.rank == 0 else False
migration = True if self.migration_prob > 0 else False
self.propulate_comm.barrier()

# Loop over generations.
while self.generations <= -1 or self.generation < self.generations:
if debug == 1 and self.generation % int(logging_interval) == 0:
log.info(
f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..."
)

# Breed and evaluate individual.
self._evaluate_individual()
# TODO this should probably be refactored, checkpointing can probably be handled in one place
with h5py.File(
self.checkpoint_path, "a", driver="mpio", comm=self.propulate_comm
) as f:
while self.generation < self.generations:
if self.generation % int(logging_interval) == 0:
log.info(
f"Island {self.island_idx} Worker {self.island_comm.rank}: In generation {self.generation}..."
)

# Check for and possibly receive incoming individuals from other intra-island workers.
self._receive_intra_island_individuals()
# Breed and evaluate individual.
self._evaluate_individual(f)

if migration:
# Emigration: Island sends individuals out.
# Happens on per-worker basis with certain probability.
if self.rng.random() < self.migration_prob:
self._send_emigrants()
# Check for and possibly receive incoming individuals from other intra-island workers.
self._receive_intra_island_individuals()

# Immigration: Island checks for incoming individuals from other islands.
self._receive_immigrants()
if migration:
# Emigration: Island sends individuals out.
# Happens on per-worker basis with certain probability.
if self.rng.random() < self.migration_prob:
self._send_emigrants()

# Immigration: Check for individuals replaced by other intra-island workers to be deactivated.
self._deactivate_replaced_individuals()
# Immigration: Island checks for incoming individuals from other islands.
# TODO this should probably update the checkpoint so it needs to pass the handle
self._receive_immigrants(None)

if dump: # Dump checkpoint.
self._dump_checkpoint()
# Immigration: Check for individuals replaced by other intra-island workers to be deactivated.
self._deactivate_replaced_individuals()

dump = (
self._determine_worker_dumping_next()
) # Determine worker dumping checkpoint in the next generation.
self.generation += 1 # Go to next generation.
self.generation += 1 # Go to next generation.

# Having completed all generations, the workers have to wait for each other.
# Once all workers are done, they should check for incoming messages once again
Expand All @@ -464,7 +468,8 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None:

if migration:
# Final check for incoming individuals from other islands.
self._receive_immigrants()
# TODO this needs to update the checkpoint
self._receive_immigrants(None)
self.propulate_comm.barrier()

# Immigration: Final check for individuals replaced by other intra-island workers to be deactivated.
Expand All @@ -477,11 +482,3 @@ def propulate(self, logging_interval: int = 10, debug: int = 1) -> None:
f"Finally {len(self.replaced)} individual(s) in replaced: {self.replaced}:\n{self.population}"
)
self._deactivate_replaced_individuals()
self.propulate_comm.barrier()

# Final checkpointing on rank 0.
if self.island_comm.rank == 0:
self._dump_final_checkpoint() # Dump checkpoint.
self.propulate_comm.barrier()
_ = self._determine_worker_dumping_next()
self.propulate_comm.barrier()
13 changes: 7 additions & 6 deletions propulate/population.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def __init__(
self.migration_history: str = "" # migration history
self.evaltime = float("inf") # evaluation time
self.evalperiod = 0.0 # evaluation duration
self.island_rank = 0 # rank in the island comm

# NOTE needed for PSO type propagators
self.velocity = velocity
Expand All @@ -99,11 +100,11 @@ def __getitem__(self, key: str) -> Union[float, int, str]:
return self.mapping[key]
else:
# continuous variable
if self.types[key] == float:
if self.types[key] is float:
return float(self.position[self.offsets[key]].item())
elif self.types[key] == int:
elif self.types[key] is int:
return int(np.rint(self.position[self.offsets[key]]).item())
elif self.types[key] == str:
elif self.types[key] is str:
offset = self.offsets[key]
upper = self.offsets[key] + len(self.limits[key])
return str(
Expand All @@ -120,13 +121,13 @@ def __setitem__(self, key: str, newvalue: Union[float, int, str, Any]) -> None:
else:
if key not in self.limits:
raise ValueError("Unknown gene.")
if self.types[key] == float:
if self.types[key] is float:
assert isinstance(newvalue, float)
self.position[self.offsets[key]] = newvalue
elif self.types[key] == int:
elif self.types[key] is int:
assert isinstance(newvalue, int)
self.position[self.offsets[key]] = float(newvalue)
elif self.types[key] == str:
elif self.types[key] is str:
assert newvalue in self.limits[key]
offset = self.offsets[key]
upper = len(self.limits[key])
Expand Down
8 changes: 8 additions & 0 deletions propulate/propagators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ def __init__(
if rng is None:
rng = random.Random()
self.rng = rng # Random number generator
# TODO
self.limits: Mapping[
str, Union[Tuple[float, float], Tuple[int, int], Tuple[str, ...]]
] = dict()

def __call__(self, inds: List[Individual]) -> Union[List[Individual], Individual]:
"""
Expand Down Expand Up @@ -177,6 +181,9 @@ class Conditional(Propagator):

def __init__(
self,
limits: Mapping[
str, Union[Tuple[float, float], Tuple[int, int], Tuple[str, ...]]
],
pop_size: int,
true_prop: Propagator,
false_prop: Propagator,
Expand All @@ -203,6 +210,7 @@ def __init__(
self.pop_size = pop_size
self.true_prop = true_prop
self.false_prop = false_prop
self.limits = limits

def __call__(self, inds: List[Individual]) -> Union[List[Individual], Individual]:
"""
Expand Down
1 change: 1 addition & 0 deletions propulate/propagators/cmaes.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,7 @@ def __init__(
The separate random number generator for the Propulate optimization.
"""
self.adapter = adapter
self.limits = limits
problem_dimension = len(limits)
# Number of individuals considered for each generation
lambd = (
Expand Down
2 changes: 1 addition & 1 deletion propulate/propagators/pso.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def __call__(self, individuals: List[Individual]) -> Individual:
individuals : List[propulate.population.Individual]
The list of individuals that must at least contain one individual that belongs to the propagator.
This list is used to calculate personal and global best of the particle and the swarm,
respectively, and then to update the particle based on the retrieved results.
respectively, and then to update the particle based on the retrieved results.
cannot be used as ``Individual`` objects are converted to particles first.

Returns
Expand Down
Loading
Loading