-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathergometer.py
120 lines (96 loc) · 3.94 KB
/
ergometer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import tempfile, shutil, os, sys, signal, datetime, time, json
class Ergometer:
def __init__(self, device=None, session=None, log=None):
self.device = device
self.session = session
self.pc = 0
if log:
self.logtemp = tempfile.NamedTemporaryFile(mode='w', delete=False)
self.log = log
else:
self.logtemp = None
self.log = None
def run(self):
def handler(signum, frame):
self.device.status()
if self.device.rpm < 20 or self.device.pulse < 20:
sys.stderr.write('.')
return
step = self.session.step(self.pc)
sys.stderr.write('\n%d/%d bpm\t%d W\t%d rpm' % (int(self.device.pulse), int(step['pulse']), int(self.device.power), self.device.rpm))
try:
self.logtemp.write('%d %d %d %d\n' % (int(self.device.pulse), int(step['pulse']), int(self.device.power), int(self.device.rpm)))
self.logtemp.flush()
except:
pass
self.pc += 1
if step['power'] > 0.0:
self.device.set_power(absolute=step['power'])
elif step['pulse'] > 0.0:
diff = step['pulse'] - self.device.pulse
if diff <= 0 and diff >= -5.0:
return
absdiff = abs(diff)
if absdiff > 10.0:
r = 0.30
elif absdiff > 5.0:
r = 0.15
elif absdiff > 0.0:
r = 0.03
else:
r = 0.0
# if absdiff > 10.0:
# r = 0.25
# elif absdiff > 5.0:
# r = 0.125
# elif absdiff > 0.0:
# r = 0.025
# else:
# r = 0.0
self.device.set_power(relative = r if diff > 0 else -r)
sys.stderr.write('Waiting for session to start')
while True:
self.device.status()
if self.device.rpm < 20 or self.device.pulse < 20:
sys.stderr.write('.')
time.sleep(1.0)
else:
sys.stderr.write('..ok\n')
self.begin = datetime.datetime.utcnow()
break
signal.signal(signal.SIGALRM, handler)
signal.setitimer(signal.ITIMER_REAL, 1, 1)
# loop here until session is finished
while (datetime.datetime.utcnow() - self.begin).seconds < self.session.duration:
time.sleep(0.01)
self.end = datetime.datetime.utcnow()
self.finish()
def finish(self):
print '\nfinishing...'
self.device.pulse = 0
self.session.step(self.pc)['pulse'] = 0
self.device.power = 0
self.device.rpm = 0
if self.log:
self.logtemp.close()
fmt = '%Y-%m-%dT%H:%M:%S'
data = {
'begin': self.begin.strftime(fmt),
'end': self.end.strftime(fmt),
'duration': self.session.duration,
'name': self.session.name,
'steps': []
}
for line in open(self.logtemp.name).read().split('\n'):
if len(line) < 1:
continue
parts = line.split(' ')
step = {
'pulse': parts[0],
'target_pulse': parts[1],
'power': parts[2],
'rpm': parts[3]
}
data['steps'].append(step)
open(self.log, 'w').write(json.dumps(data))
os.remove(self.logtemp.name)