-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathga.py
129 lines (108 loc) · 3.87 KB
/
ga.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from operator import attrgetter
from parameters import *
from random import randint
from individual import Individual
from functools import reduce
class GA:
def __init__(
self, data, expected_result, popsize=50, mut_rate=20, GA_printers=None
):
Individual.expected_result = {"rowdata": expected_result}
Individual.mutation_rate = mut_rate
Individual.data = data
self.popsize = popsize
self.population = []
for i in range(0, popsize):
c = Individual()
c.randomize()
self.population.append(c)
self.population.sort(key=attrgetter("fitness"), reverse=True)
self.GA_printers = GA_printers
self.generation = 0
self.wheel_size = sum(range(1, popsize + 1))
self.wheel = []
self.__init_wheel()
self.test = [0 for i in range(0, self.popsize)]
def __str__(self):
return reduce(lambda x, y: str(x) + "\n" + str(y), self.population)
def __init_wheel(self):
for i in range(0, self.popsize):
for j in range(0, self.popsize - i):
self.wheel.append(i)
def __my_choice(self):
r = randint(0, self.wheel_size - 1)
self.test[self.wheel[r]] += 1
return self.population[self.wheel[r]]
def check_duplicate(self, element):
h = hash(element)
count = reduce(lambda a, e: a + (1 if hash(e) == h else 0), self.population, 0)
return count >= 2
def randomize_duplicates(self):
for i in self.population[1:]:
dup = self.check_duplicate(i)
if dup:
i.mutate(True)
i.randomize()
def apply_max(self):
for i in self.population[1:]:
i.apply_max()
def evaluate_all(self):
for i in self.population:
i.evaluate()
def run_once(self):
newpop = []
# Elitism, keep the best chromosome to the next generation
newpop = self.population[:3]
for i in range(3, self.popsize):
tmpchrom = Individual(mother=self.__my_choice(), father=self.__my_choice())
tmpchrom.mutate()
newpop.append(tmpchrom)
self.population = newpop
self.evaluate_all()
self.randomize_duplicates()
self.apply_max()
self.evaluate_all()
self.population.sort(key=attrgetter("fitness"), reverse=True)
self.inc_mutation_rate(MUTATION_RATE_INC)
def inc_mutation_rate(self, mutation_rate):
Individual.mutation_rate += mutation_rate
def end(self, outfilename):
self.evaluate_all()
self.population.sort(key=attrgetter("fitness"), reverse=True)
best = self.population[0]
print(best.crits["rowdata"])
print(best.st())
print(
"ratio generation/population_size: %.3f"
% (self.generation / float(POP_SIZE))
)
best.dump(outfilename)
def run(self, outfilename="best.pickled"):
try:
while self.population[0].fitness != 1.0:
self.run_once()
for printer in self.GA_printers:
printer.refresh_screen(self)
self.generation += 1
self.end(outfilename)
except KeyboardInterrupt:
self.end(outfilename)
class GA_print_text:
def refresh_screen(self, ga):
print(
"Generation: %d (mutation rate=%d)"
% (ga.generation, Individual.mutation_rate)
)
for i in ga.population[:5]:
print(i)
def run(data, expected, outfilename):
a = GA(data, expected, POP_SIZE, MUTATION_RATE, GA_printers=[GA_print_text()])
a.run(outfilename)
if __name__ == "__main__":
import sys
stream = open(sys.argv[1])
data = stream.read()
stream = open(sys.argv[2])
expected = stream.read()
outfilename = sys.argv[3]
run(data, expected, outfilename)