-
Notifications
You must be signed in to change notification settings - Fork 1
/
classify_myo_ros.py
executable file
·71 lines (57 loc) · 2.33 KB
/
classify_myo_ros.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#!/usr/bin/env python
'''Simple ROS Node for Gesture Classification
This script provides Gesture data for self learned Gestures from the Myo bracelet to ROS.
The Gestures are learned using the train_myo_ros.py script which saves the different
Gesture informations in maximum 10 files (for each Gesture one). These files are loaded
from this script to get classification information for the Gestures. The recognized
Gestures then are published on a ros topic.
This script is based on the myo-raw project - especially on the classify_myo.py
and the myo.py files. (see https://github.com/dzhu/myo-raw/ which is available under
the MIT LICENSE)
Following changes where made:
- ros code added
- split up classifying and gesture learning
usage: python classify_myo_ros.py (but myo_ros.py must be running)
provides:
- publishes data in topics: /myo/classifier/gesture
- subscribes to: /myo/data/emg
- uses vals0.dat - vals9.dat files generated by train_myo.ros
'''
import nn_classifier
# Ros libraries
import roslib
import rospy
# emg, gesture
from std_msgs.msg import Int32, Int32MultiArray
from collections import deque
class ClassifyMyoROS:
def __init__(self):
self.nnclassifier = nn_classifier.NNClassifier()
'''Initialize ros and ros publisher, ros subscriber'''
rospy.init_node('myo_classifier', anonymous=True)
self.pub_gesture = rospy.Publisher("/myo/classifier/gesture", Int32, queue_size=1)
self.sub_emg = rospy.Subscriber("/myo/data/emg", Int32MultiArray, self.callback_emg, queue_size=1)
self.last_gesture = -1
self.gestures_queue = deque([])
self.nnclassifier.read_data()
def callback_emg(self, ros_data):
self.gestures_queue.append(self.nnclassifier.classify(ros_data.data))
if len(self.gestures_queue) >= 20:
self.gestures_queue.popleft()
if __name__ == '__main__':
c = ClassifyMyoROS()
rate = rospy.Rate(10) # 10hz
try:
while not rospy.is_shutdown():
tmp = -1
for i in range(0, 10):
count = c.gestures_queue.count(i)
if count > tmp:
tmp = i
c.last_gesture = tmp
c.pub_gesture.publish(c.last_gesture)
rate.sleep()
except KeyboardInterrupt:
pass
finally:
print()