-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathengine.py
110 lines (82 loc) · 3.04 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import pyaudio
import time
import threading
import torch
import wave
import torchaudio
from model import StackedConvNet
class FEATURE_INIT(torch.nn.Module):
def __init__(self, sample_rate):
super().__init__()
self.transforms = torchaudio.transforms.MFCC(
sample_rate=sample_rate,
n_mfcc=80,
melkwargs={'n_mels': 80, 'win_length': 160, 'hop_length': 80})
def __call__(self, x):
print(x.shape)
x = self.transforms(x)
return x
class Listener:
def __init__(self, model_file, sample_rate=16_000, record_seconds=2):
self.chunk = 1000
self.sample_rate = sample_rate
self.record_seconds = record_seconds
self.p = pyaudio.PyAudio()
self.stream = self.p.open(
format=pyaudio.paInt16,
channels=1,
rate=self.sample_rate,
input=True,
output=True,
frames_per_buffer=self.chunk)
self.queue = []
self.model = StackedConvNet(
in_channels=80,
intermediate_channels=128,
out_channels=8,
pool_size=45,
embed_dim=15,
num_layers=4
)
self.model.load_state_dict(torch.load(model_file))
self.model.eval()
self.sampler = FEATURE_INIT(sample_rate=self.sample_rate)
self.FLAG = False
self.fname = "wakeword_temp"
def listen(self):
while self.FLAG != True:
data = self.stream.read(self.chunk, exception_on_overflow=False)
self.queue.append(data)
time.sleep(0.01)
def save(self):
wf = wave.open(self.fname, "wb")
wf.setnchannels(1)
wf.setsampwidth(self.p.get_sample_size(pyaudio.paInt16))
wf.setframerate(self.sample_rate)
wf.writeframes(b"".join(self.queue[:self.sample_rate*self.record_seconds]))
wf.close()
waveform, _ = torchaudio.load(self.fname)
return waveform.mean(0)
def make_prediction(self):
while True:
with torch.no_grad():
audio_data = self.sampler(self.save()).unsqueeze(0)
pred = self.model(audio_data)
if pred.item() > 0.9:
self.FLAG = True
print("wakeword found", pred.item())
break
else:
del self.queue[0]
print("no wakeword found", pred.item(), len(self.queue))
time.sleep(0.1) if len(self.queue) < 40 else time.sleep(0.01)
def run(self):
ListenThread = threading.Thread(target=self.listen, daemon=True)
ListenThread.start()
time.sleep(2)
PredThread = threading.Thread(target=self.make_prediction, daemon=True)
PredThread.start()
print("\nWake Word Engine is now listening... \n")
PredThread.join()
ListenThread.join()
Listener("model.pt").run()