-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathwatch_best_robots.py
executable file
·84 lines (69 loc) · 3.17 KB
/
watch_best_robots.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python3
import asyncio
from pygazebo.pygazebo import DisconnectError
from pyrevolve import parser
from pyrevolve.evolution import fitness
from pyrevolve.evolution.selection import multiple_selection, tournament_selection
from pyrevolve.evolution.population import Population, PopulationConfig
from pyrevolve.evolution.pop_management.steady_state import steady_state_population_management
from pyrevolve.experiment_management import ExperimentManagement
from pyrevolve.genotype.plasticoding.crossover.crossover import CrossoverConfig
from pyrevolve.genotype.plasticoding.crossover.standard_crossover import standard_crossover
from pyrevolve.genotype.plasticoding.initialization import random_initialization
from pyrevolve.genotype.plasticoding.mutation.mutation import MutationConfig
from pyrevolve.genotype.plasticoding.mutation.standard_mutation import standard_mutation
from pyrevolve.genotype.plasticoding.plasticoding import PlasticodingConfig
from pyrevolve.util.supervisor.simulator_queue import SimulatorQueue
import numpy as np
async def run():
"""
The main coroutine, which is started below.
"""
# Parse command line / file input arguments
genotype_conf = PlasticodingConfig(
max_structural_modules=100,
)
mutation_conf = MutationConfig(
mutation_prob=0.8,
genotype_conf=genotype_conf,
)
crossover_conf = CrossoverConfig(
crossover_prob=0.8,
)
settings = parser.parse_args()
experiment_management = ExperimentManagement(settings)
population_conf = PopulationConfig(
population_size=100,
genotype_constructor=random_initialization,
genotype_conf=genotype_conf,
fitness_function=fitness.displacement_velocity_hill,
mutation_operator=standard_mutation,
mutation_conf=mutation_conf,
crossover_operator=standard_crossover,
crossover_conf=crossover_conf,
selection=lambda individuals: tournament_selection(individuals, 2),
parent_selection=lambda individuals: multiple_selection(individuals, 2, tournament_selection),
population_management=steady_state_population_management,
population_management_selector=tournament_selection,
evaluation_time=settings.evaluation_time,
offspring_size=50,
experiment_name=settings.experiment_name,
experiment_management=experiment_management,
)
settings = parser.parse_args()
simulator_queue = SimulatorQueue(settings.n_cores, settings, settings.port_start)
await simulator_queue.start()
population = Population(population_conf, simulator_queue, 0)
# choose a snapshot here. and the maximum best individuals you wish to watch
generation = 100
max_best = 10
await population.load_snapshot(generation)
fitnesses = []
for ind in population.individuals:
fitnesses.append(ind.fitness)
fitnesses = np.array(fitnesses)
ini = len(population.individuals)-max_best
fin = len(population.individuals)
population.individuals = np.array(population.individuals)
population.individuals = population.individuals[np.argsort(fitnesses)[ini:fin]]
await population.evaluate(population.individuals, generation)